This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 489529262 [INLONG-6548][Sort] Optimize metadata field naming for
format of canal-json (#6549)
489529262 is described below
commit 4895292623c22a1d268bc8ce25366143bcd18005
Author: yunqingmoswu <[email protected]>
AuthorDate: Wed Nov 16 11:08:21 2022 +0800
[INLONG-6548][Sort] Optimize metadata field naming for format of canal-json
(#6549)
---
.../protocol/node/extract/KafkaExtractNode.java | 2 +-
.../sort/protocol/node/load/KafkaLoadNode.java | 8 +-
.../node/extract/KafkaExtractNodeTest.java | 3 +-
.../sort/protocol/node/load/KafkaLoadNodeTest.java | 2 +-
.../canal/CanalJsonEnhancedDecodingFormat.java | 19 ++
.../CanalJsonEnhancedDeserializationSchema.java | 260 ++++++++++++---------
.../canal/CanalJsonEnhancedEncodingFormat.java | 19 ++
.../CanalJsonEnhancedSerializationSchema.java | 52 ++---
8 files changed, 217 insertions(+), 148 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 02da2bef3..fb6693548 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -215,7 +215,7 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
metadataKey = "value.event-timestamp";
break;
case OP_TYPE:
- metadataKey = "value.op-type";
+ metadataKey = "value.type";
break;
case IS_DDL:
metadataKey = "value.is-ddl";
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index be326b478..26fa424ca 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -224,7 +224,7 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
metadataKey = "value.event-timestamp";
break;
case OP_TYPE:
- metadataKey = "value.op-type";
+ metadataKey = "value.type";
break;
case DATA:
case DATA_CANAL:
@@ -257,8 +257,8 @@ public class KafkaLoadNode extends LoadNode implements
InlongMetric, Metadata, S
@Override
public Set<MetaField> supportedMetaFields() {
return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME,
MetaField.OP_TYPE,
- MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES,
MetaField.TS,
- MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE,
MetaField.BATCH_ID,
- MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
+ MetaField.DATABASE_NAME, MetaField.SQL_TYPE,
MetaField.PK_NAMES, MetaField.TS,
+ MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE,
MetaField.BATCH_ID,
+ MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
}
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index 661d6f547..1d41c96fd 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -34,7 +34,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import static org.junit.Assert.assertEquals;
/**
@@ -86,7 +85,7 @@ public class KafkaExtractNodeTest extends
SerializeBaseTest<KafkaExtractNode> {
formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM
'value.table'");
formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM
'value.database'");
- formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM
'value.op-type'");
+ formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.type'");
formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM
'value.event-timestamp'");
formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM
'value.is-ddl'");
formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM
'value.ingestion-timestamp'");
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
index a9f495c28..da58b1ac9 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
@@ -54,7 +54,7 @@ public class KafkaLoadNodeTest extends
SerializeBaseTest<KafkaLoadNode> {
formatMap.put(MetaField.DATA, "STRING METADATA FROM
'value.data_canal'");
formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM
'value.table'");
formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM
'value.database'");
- formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM
'value.op-type'");
+ formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.type'");
formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM
'value.event-timestamp'");
formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM
'value.is-ddl'");
formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM
'value.ingestion-timestamp'");
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
index dfc40e548..d79734aed 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
@@ -225,6 +225,10 @@ public class CanalJsonEnhancedDecodingFormat implements
DecodingFormat<Deseriali
}
}),
// additional metadata
+ /**
+ * It is deprecated, please use {@link this#TYPE} instead
+ */
+ @Deprecated
OP_TYPE(
"op-type",
DataTypes.STRING().nullable(),
@@ -232,6 +236,21 @@ public class CanalJsonEnhancedDecodingFormat implements
DecodingFormat<Deseriali
new MetadataConverter() {
private static final long serialVersionUID = 1L;
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getString(pos);
+ }
+ }),
+ TYPE(
+ "type",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("type", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Object convert(GenericRowData row, int pos) {
if (row.isNullAt(pos)) {
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
index f7f3dc933..db12a13fb 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
@@ -18,15 +18,6 @@
package org.apache.inlong.sort.formats.json.canal;
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,6 +35,15 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import
org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import static java.lang.String.format;
+
/**
* Deserialization schema from Canal JSON to Flink Table/SQL internal data
structure {@link
* RowData}. The deserialization schema knows Canal's schema definition and
can extract the database
@@ -56,6 +56,7 @@ import
org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat
* @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
*/
public final class CanalJsonEnhancedDeserializationSchema implements
DeserializationSchema<RowData> {
+
private static final long serialVersionUID = 1L;
private static final String FIELD_OLD = "old";
@@ -64,37 +65,61 @@ public final class CanalJsonEnhancedDeserializationSchema
implements Deserializa
private static final String OP_DELETE = "DELETE";
private static final String OP_CREATE = "CREATE";
- /** The deserializer to deserialize Canal JSON data. */
+ /**
+ * The deserializer to deserialize Canal JSON data.
+ */
private final JsonRowDataDeserializationSchema jsonDeserializer;
- /** Flag that indicates that an additional projection is required for
metadata. */
+ /**
+ * Flag that indicates that an additional projection is required for
metadata.
+ */
private final boolean hasMetadata;
- /** Metadata to be extracted for every record. */
+ /**
+ * Metadata to be extracted for every record.
+ */
private final MetadataConverter[] metadataConverters;
- /** {@link TypeInformation} of the produced {@link RowData} (physical +
meta data). */
+ /**
+ * {@link TypeInformation} of the produced {@link RowData} (physical +
meta data).
+ */
private final TypeInformation<RowData> producedTypeInfo;
- /** Only read changelogs from the specific database. */
- private final @Nullable String database;
+ /**
+ * Only read changelogs from the specific database.
+ */
+ private final @Nullable
+ String database;
- /** Only read changelogs from the specific table. */
- private final @Nullable String table;
+ /**
+ * Only read changelogs from the specific table.
+ */
+ private final @Nullable
+ String table;
- /** Flag indicating whether to ignore invalid fields/rows (default: throw
an exception). */
+ /**
+ * Flag indicating whether to ignore invalid fields/rows (default: throw
an exception).
+ */
private final boolean ignoreParseErrors;
- /** Names of fields. */
+ /**
+ * Names of fields.
+ */
private final List<String> fieldNames;
- /** Number of fields. */
+ /**
+ * Number of fields.
+ */
private final int fieldCount;
- /** Pattern of the specific database. */
+ /**
+ * Pattern of the specific database.
+ */
private final Pattern databasePattern;
- /** Pattern of the specific table. */
+ /**
+ * Pattern of the specific table.
+ */
private final Pattern tablePattern;
private CanalJsonEnhancedDeserializationSchema(
@@ -133,7 +158,9 @@ public final class CanalJsonEnhancedDeserializationSchema
implements Deserializa
// Builder
//
------------------------------------------------------------------------------------------
- /** Creates A builder for building a {@link
CanalJsonEnhancedDeserializationSchema}. */
+ /**
+ * Creates A builder for building a {@link
CanalJsonEnhancedDeserializationSchema}.
+ */
public static Builder builder(
DataType physicalDataType,
List<ReadableMetadata> requestedMetadata,
@@ -141,60 +168,49 @@ public final class CanalJsonEnhancedDeserializationSchema
implements Deserializa
return new Builder(physicalDataType, requestedMetadata,
producedTypeInfo);
}
- /** A builder for creating a {@link
CanalJsonEnhancedDeserializationSchema}. */
- @Internal
- public static final class Builder {
- private final DataType physicalDataType;
- private final List<ReadableMetadata> requestedMetadata;
- private final TypeInformation<RowData> producedTypeInfo;
- private String database = null;
- private String table = null;
- private boolean ignoreParseErrors = false;
- private TimestampFormat timestampFormat = TimestampFormat.SQL;
-
- private Builder(
- DataType physicalDataType,
- List<ReadableMetadata> requestedMetadata,
- TypeInformation<RowData> producedTypeInfo) {
- this.physicalDataType = physicalDataType;
- this.requestedMetadata = requestedMetadata;
- this.producedTypeInfo = producedTypeInfo;
- }
-
- public Builder setDatabase(String database) {
- this.database = database;
- return this;
- }
+ private static RowType createJsonRowType(
+ DataType physicalDataType, List<ReadableMetadata>
readableMetadata) {
+ // Canal JSON contains other information, e.g. "ts", "sql", but we
don't need them
+ DataType root =
+ DataTypes.ROW(
+ DataTypes.FIELD("data",
DataTypes.ARRAY(physicalDataType)),
+ DataTypes.FIELD("old",
DataTypes.ARRAY(physicalDataType)),
+ ReadableMetadata.TYPE.requiredJsonField,
+ ReadableMetadata.DATABASE.requiredJsonField,
+ ReadableMetadata.TABLE.requiredJsonField);
+ // append fields that are required for reading metadata in the root
+ final List<DataTypes.Field> rootMetadataFields =
+ readableMetadata.stream()
+ .filter(m -> m != ReadableMetadata.DATABASE
+ && m != ReadableMetadata.TABLE
+ && m != ReadableMetadata.TYPE)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ return (RowType) DataTypeUtils.appendRowFields(root,
rootMetadataFields).getLogicalType();
+ }
- public Builder setTable(String table) {
- this.table = table;
- return this;
- }
+ //
------------------------------------------------------------------------------------------
- public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
- this.ignoreParseErrors = ignoreParseErrors;
- return this;
- }
+ private static MetadataConverter[] createMetadataConverters(
+ RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+ return requestedMetadata.stream()
+ .map(m -> convert(jsonRowType, m))
+ .toArray(MetadataConverter[]::new);
+ }
- public Builder setTimestampFormat(TimestampFormat timestampFormat) {
- this.timestampFormat = timestampFormat;
- return this;
- }
+ private static MetadataConverter convert(RowType jsonRowType,
ReadableMetadata metadata) {
+ final int pos =
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+ return new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
- public CanalJsonEnhancedDeserializationSchema build() {
- return new CanalJsonEnhancedDeserializationSchema(
- physicalDataType,
- requestedMetadata,
- producedTypeInfo,
- database,
- table,
- ignoreParseErrors,
- timestampFormat);
- }
+ @Override
+ public Object convert(GenericRowData root, int unused) {
+ return metadata.converter.convert(root, pos);
+ }
+ };
}
- //
------------------------------------------------------------------------------------------
-
@Override
public RowData deserialize(byte[] message) throws IOException {
throw new RuntimeException(
@@ -315,6 +331,8 @@ public final class CanalJsonEnhancedDeserializationSchema
implements Deserializa
return producedTypeInfo;
}
+ //
--------------------------------------------------------------------------------------------
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -345,49 +363,6 @@ public final class CanalJsonEnhancedDeserializationSchema
implements Deserializa
fieldCount);
}
- //
--------------------------------------------------------------------------------------------
-
- private static RowType createJsonRowType(
- DataType physicalDataType, List<ReadableMetadata>
readableMetadata) {
- // Canal JSON contains other information, e.g. "ts", "sql", but we
don't need them
- DataType root =
- DataTypes.ROW(
- DataTypes.FIELD("data",
DataTypes.ARRAY(physicalDataType)),
- DataTypes.FIELD("old",
DataTypes.ARRAY(physicalDataType)),
- DataTypes.FIELD("type", DataTypes.STRING()),
- ReadableMetadata.DATABASE.requiredJsonField,
- ReadableMetadata.TABLE.requiredJsonField);
- // append fields that are required for reading metadata in the root
- final List<DataTypes.Field> rootMetadataFields =
- readableMetadata.stream()
- .filter(m -> m != ReadableMetadata.DATABASE && m !=
ReadableMetadata.TABLE)
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- return (RowType) DataTypeUtils.appendRowFields(root,
rootMetadataFields).getLogicalType();
- }
-
- private static MetadataConverter[] createMetadataConverters(
- RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
- return requestedMetadata.stream()
- .map(m -> convert(jsonRowType, m))
- .toArray(MetadataConverter[]::new);
- }
-
- private static MetadataConverter convert(RowType jsonRowType,
ReadableMetadata metadata) {
- final int pos =
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
- return new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData root, int unused) {
- return metadata.converter.convert(root, pos);
- }
- };
- }
-
- //
--------------------------------------------------------------------------------------------
-
/**
* Converter that extracts a metadata field from the row that comes out of
the JSON schema and
* converts it to the desired data type.
@@ -401,4 +376,61 @@ public final class CanalJsonEnhancedDeserializationSchema
implements Deserializa
Object convert(GenericRowData row, int pos);
}
+
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A builder for creating a {@link CanalJsonEnhancedDeserializationSchema}.
+ */
+ @Internal
+ public static final class Builder {
+
+ private final DataType physicalDataType;
+ private final List<ReadableMetadata> requestedMetadata;
+ private final TypeInformation<RowData> producedTypeInfo;
+ private String database = null;
+ private String table = null;
+ private boolean ignoreParseErrors = false;
+ private TimestampFormat timestampFormat = TimestampFormat.SQL;
+
+ private Builder(
+ DataType physicalDataType,
+ List<ReadableMetadata> requestedMetadata,
+ TypeInformation<RowData> producedTypeInfo) {
+ this.physicalDataType = physicalDataType;
+ this.requestedMetadata = requestedMetadata;
+ this.producedTypeInfo = producedTypeInfo;
+ }
+
+ public Builder setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public Builder setTable(String table) {
+ this.table = table;
+ return this;
+ }
+
+ public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+ this.ignoreParseErrors = ignoreParseErrors;
+ return this;
+ }
+
+ public Builder setTimestampFormat(TimestampFormat timestampFormat) {
+ this.timestampFormat = timestampFormat;
+ return this;
+ }
+
+ public CanalJsonEnhancedDeserializationSchema build() {
+ return new CanalJsonEnhancedDeserializationSchema(
+ physicalDataType,
+ requestedMetadata,
+ producedTypeInfo,
+ database,
+ table,
+ ignoreParseErrors,
+ timestampFormat);
+ }
+ }
}
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
index 2595fe477..a285a292e 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
@@ -206,6 +206,25 @@ public class CanalJsonEnhancedEncodingFormat implements
EncodingFormat<Serializa
}
}),
// additional metadata
+ TYPE(
+ "type",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("type", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getString(pos);
+ }
+ }),
+ /**
+ * It is deprecated, please use {@link this#TYPE} instead
+ */
+ @Deprecated
OP_TYPE(
"op-type",
DataTypes.STRING().nullable(),
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
index 2e8c65abd..ca3c80e1e 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
@@ -55,18 +55,17 @@ public class CanalJsonEnhancedSerializationSchema
implements SerializationSchema
private static final StringData OP_INSERT =
StringData.fromString("INSERT");
private static final StringData OP_DELETE =
StringData.fromString("DELETE");
-
- private transient GenericRowData reuse;
-
- /** The serializer to serialize Canal JSON data. */
+ /**
+ * The serializer to serialize Canal JSON data.
+ */
private final JsonRowDataSerializationSchema jsonSerializer;
-
private final RowData.FieldGetter[] physicalFieldGetter;
-
private final RowData.FieldGetter[] wirteableMetadataFieldGetter;
-
- /** row schema that json serializer can parse output row to json format */
+ /**
+ * row schema that json serializer can parse output row to json format
+ */
private final RowType jsonRowType;
+ private transient GenericRowData reuse;
/**
* Constructor of CanalJsonEnhancedSerializationSchema.
@@ -105,6 +104,23 @@ public class CanalJsonEnhancedSerializationSchema
implements SerializationSchema
encodeDecimalAsPlainNumber);
}
+ private static RowType createJsonRowType(DataType physicalDataType,
List<WriteableMetadata> writeableMetadata) {
+ // Canal JSON contains other information, e.g. "database", "ts"
+ // but we don't need them
+ // and we don't need "old" , because can not support
UPDATE_BEFORE,UPDATE_AFTER
+ DataType root =
+ DataTypes.ROW(
+ DataTypes.FIELD("data",
DataTypes.ARRAY(physicalDataType)),
+ WriteableMetadata.TYPE.requiredJsonField);
+ // append fields that are required for reading metadata in the root
+ final List<DataTypes.Field> metadataFields =
+ writeableMetadata.stream().filter(m -> m !=
WriteableMetadata.TYPE)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ return (RowType) DataTypeUtils.appendRowFields(root,
metadataFields).getLogicalType();
+ }
+
@Override
public void open(InitializationContext context) {
reuse = new GenericRowData(2 + wirteableMetadataFieldGetter.length);
@@ -118,7 +134,7 @@ public class CanalJsonEnhancedSerializationSchema
implements SerializationSchema
IntStream.range(0, physicalFieldGetter.length)
.forEach(targetField ->
physicalData.setField(targetField,
physicalFieldGetter[targetField].getFieldOrNull(row)));
- ArrayData arrayData = new GenericArrayData(new RowData[]
{physicalData});
+ ArrayData arrayData = new GenericArrayData(new
RowData[]{physicalData});
reuse.setField(0, arrayData);
// mete data injection
@@ -165,23 +181,6 @@ public class CanalJsonEnhancedSerializationSchema
implements SerializationSchema
}
}
- private static RowType createJsonRowType(DataType physicalDataType,
List<WriteableMetadata> writeableMetadata) {
- // Canal JSON contains other information, e.g. "database", "ts"
- // but we don't need them
- // and we don't need "old" , because can not support
UPDATE_BEFORE,UPDATE_AFTER
- DataType root =
- DataTypes.ROW(
- DataTypes.FIELD("data",
DataTypes.ARRAY(physicalDataType)),
- DataTypes.FIELD("type", DataTypes.STRING()));
- // append fields that are required for reading metadata in the root
- final List<DataTypes.Field> metadataFields =
- writeableMetadata.stream()
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- return (RowType) DataTypeUtils.appendRowFields(root,
metadataFields).getLogicalType();
- }
-
//
--------------------------------------------------------------------------------------------
/**
@@ -189,6 +188,7 @@ public class CanalJsonEnhancedSerializationSchema
implements SerializationSchema
* Finally all metadata field will splice into a GenericRowData, then json
Serializer serialize it into json string.
*/
interface MetadataConverter extends Serializable {
+
Object convert(RowData inputRow, int pos);
}
}