This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bb45659e1 [INLONG-7958][Sort] Fix MongoDB's schema becomes unordered
after extracting the row data (#7960)
bb45659e1 is described below
commit bb45659e1b0d300d8d306f6cf47ab849aa8f906e
Author: emhui <[email protected]>
AuthorDate: Sun May 7 08:31:22 2023 +0800
[INLONG-7958][Sort] Fix MongoDB's schema becomes unordered after extracting
the row data (#7960)
---
.../sort/cdc/mongodb/debezium/DebeziumJson.java | 49 ++++++
.../MongoDBConnectorDeserializationSchema.java | 5 +-
.../source/reader/MongoDBRecordEmitter.java | 3 +-
.../source/reader/fetch/MongoDBScanFetchTask.java | 3 +-
.../reader/fetch/MongoDBStreamFetchTask.java | 3 +-
.../cdc/mongodb/source/utils/MetaDataUtils.java | 177 +++++++++++++++++++++
.../sort/cdc/mongodb/source/utils/MongoUtils.java | 3 +-
.../cdc/mongodb/table/MongoDBReadableMetadata.java | 152 +-----------------
8 files changed, 241 insertions(+), 154 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java
new file mode 100644
index 000000000..f64fc5e2a
--- /dev/null
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.debezium;
+
+import io.debezium.relational.history.TableChanges;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class DebeziumJson {
+
+ private Map<String, String> before;
+ private Map<String, Object> after;
+ private Source source;
+ private TableChanges.TableChange tableChange;
+ private long tsMs;
+ private String op;
+
+ @Builder
+ @Data
+ public static class Source {
+
+ private String name;
+ private String db;
+ private String table;
+ private List<String> pkNames;
+ private Map<String, Integer> sqlType;
+ private Map<String, String> mysqlType;
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index 98e5cf4e9..b8a0a86cc 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mongodb.debezium.table;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.internal.HexUtils;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
+import java.util.LinkedHashMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
@@ -809,8 +810,8 @@ public class MongoDBConnectorDeserializationSchema
}
return row;
} else {
- Map<String, Object> data = new HashMap<>();
- Map<String, String> dataType = new HashMap<>();
+ Map<String, Object> data = new LinkedHashMap<>();
+ Map<String, String> dataType = new LinkedHashMap<>();
document.forEach((key, value) -> {
try {
LogicalType logicalType =
RecordUtils.convertLogicType(value);
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
index b9e17a624..5f647031e 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
@@ -55,8 +55,7 @@ import org.slf4j.LoggerFactory;
*/
public final class MongoDBRecordEmitter<T> extends
IncrementalSourceRecordEmitter<T> {
- private static final Logger LOG = LoggerFactory.getLogger(
-
com.ververica.cdc.connectors.mongodb.source.reader.MongoDBRecordEmitter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoDBRecordEmitter.class);
public MongoDBRecordEmitter(
DebeziumDeserializationSchema<T> deserializationSchema,
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
index 88f4c2c2d..963461ea9 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
@@ -65,8 +65,7 @@ import org.slf4j.LoggerFactory;
*/
public class MongoDBScanFetchTask implements FetchTask<SourceSplitBase> {
- private static final Logger LOG = LoggerFactory.getLogger(
-
com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoDBScanFetchTask.class);
private final SnapshotSplit snapshotSplit;
private volatile boolean taskRunning = false;
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
index 69fa89772..420132b87 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
@@ -82,8 +82,7 @@ import org.slf4j.LoggerFactory;
*/
public class MongoDBStreamFetchTask implements FetchTask<SourceSplitBase> {
- private static final Logger LOG = LoggerFactory.getLogger(
-
com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoDBStreamFetchTask.class);
private final StreamSplit streamSplit;
private volatile boolean taskRunning = false;
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java
new file mode 100644
index 000000000..28cc14d58
--- /dev/null
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.source.utils;
+
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumJson;
+import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumJson.Source;
+import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+public class MetaDataUtils {
+
+ private static final String MONGODB_DEFAULT_PRIMARY_KEY = "_id";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * get collection name from record
+ */
+ public static String getMetaData(SourceRecord record, String metaDataKey) {
+ Struct value = (Struct) record.value();
+ Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
+ return to.getString(metaDataKey);
+ }
+
+ /**
+ * get sql type from row data, represents the jdbc data type
+ */
+ public static Map<String, Integer> getSqlType(@Nullable RowData rowData) {
+ if (rowData == null) {
+ return null;
+ }
+ GenericRowData data = (GenericRowData) rowData;
+ Map<String, String> mongoDbType = (Map<String, String>)
data.getField(1);
+ Map<String, Integer> sqlType = new LinkedHashMap<>();
+ mongoDbType.forEach((name, value) -> sqlType.put(name,
RecordUtils.getSqlType(value)));
+ return sqlType;
+ }
+
+ private static String getDebeziumOpType(RowData rowData) {
+ String opType;
+ switch (rowData.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "d";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "c";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states
in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+ }
+ return opType;
+ }
+
+ private static String getCanalOpType(RowData rowData) {
+ String opType;
+ switch (rowData.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "DELETE";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "INSERT";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states
in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+ }
+ return opType;
+ }
+
+ public static StringData getCanalData(SourceRecord record, RowData rowData,
+ TableChange tableSchema) {
+ // construct canal json
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ GenericRowData data = (GenericRowData) rowData;
+ Map<String, Object> field = (Map<String, Object>) data.getField(0);
+ Map<String, String> mongoDbType = (Map<String, String>)
data.getField(1);
+
+ String database = getMetaData(record,
MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+ String table = getMetaData(record,
MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+ Long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+ long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+
+ List<Map<String, Object>> dataList = new ArrayList<>();
+ dataList.add(field);
+ CanalJson canalJson = CanalJson.builder()
+ .data(dataList)
+ .database(database)
+ .sql("")
+ .es(opTs)
+ .isDdl(false)
+
.pkNames(Collections.singletonList(MONGODB_DEFAULT_PRIMARY_KEY))
+ .mysqlType(mongoDbType)
+ .table(table)
+ .ts(ts)
+ .type(getCanalOpType(rowData))
+ .sqlType(getSqlType(data))
+ .build();
+ try {
+ return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta
data", e);
+ }
+ }
+
+ public static StringData getDebeziumData(SourceRecord record, TableChange
tableSchema,
+ RowData rowData) {
+ // construct debezium json
+ Struct messageStruct = (Struct) record.value();
+ GenericRowData data = (GenericRowData) rowData;
+ Map<String, Object> field = (Map<String, Object>) data.getField(0);
+ Map<String, String> mongoDbType = (Map<String, String>)
data.getField(1);
+
+ String database = getMetaData(record,
MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+ String table = getMetaData(record,
MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+ long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+ String debeziumOp = getDebeziumOpType(rowData);
+
+ Source source = Source.builder()
+ .db(database)
+ .table(table)
+ .name("mongodb_cdc_source")
+ .mysqlType(mongoDbType)
+ .sqlType(getSqlType(rowData))
+
.pkNames(Collections.singletonList(MONGODB_DEFAULT_PRIMARY_KEY))
+ .build();
+ DebeziumJson debeziumJson = DebeziumJson.builder()
+ .source(source)
+ .after(field)
+ .tsMs(ts)
+ .op(debeziumOp)
+ .tableChange(tableSchema)
+ .build();
+ try {
+ return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta
data", e);
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
index 874cc0a59..979493809 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
@@ -70,8 +70,7 @@ import org.slf4j.LoggerFactory;
*/
public class MongoUtils {
- private static final Logger LOG = LoggerFactory.getLogger(
-
com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoUtils.class);
public static final BsonDouble COMMAND_SUCCEED_FLAG = new BsonDouble(1.0d);
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
index 19db352eb..0fa28fe37 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
@@ -17,31 +17,22 @@
package org.apache.inlong.sort.cdc.mongodb.table;
+import static
org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getCanalData;
+import static
org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getDebeziumData;
+import static
org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getMetaData;
+
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
-import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.commons.lang3.StringUtils;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.inlong.sort.cdc.mongodb.debezium.table.MetadataConverter;
-import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils;
-import org.apache.inlong.sort.formats.json.canal.CanalJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -57,10 +48,7 @@ public enum MongoDBReadableMetadata {
@Override
public Object read(SourceRecord record) {
- Struct value = (Struct) record.value();
- Struct to =
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
- return StringData.fromString(
-
to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD));
+ return getMetaData(record,
MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
}
}),
@@ -73,10 +61,7 @@ public enum MongoDBReadableMetadata {
@Override
public Object read(SourceRecord record) {
- Struct value = (Struct) record.value();
- Struct to =
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
- return StringData.fromString(
-
to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD));
+ return getMetaData(record,
MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
}
}),
@@ -111,40 +96,8 @@ public enum MongoDBReadableMetadata {
@Override
public Object read(SourceRecord record,
@Nullable TableChanges.TableChange tableSchema,
RowData rowData) {
-
// construct debezium json
- Struct messageStruct = (Struct) record.value();
- Struct to =
messageStruct.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
- Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
- GenericRowData data = (GenericRowData) rowData;
- Map<String, Object> field = (Map<String, Object>)
data.getField(0);
- Map<String, String> mysqlType = (Map<String, String>)
data.getField(1);
- Map<String, Integer> sqlType = new HashMap<>();
- mysqlType.forEach((name, value) -> sqlType.put(name,
RecordUtils.getSqlType(value)));
- String debeziumOp = getDebeziumOpType(rowData);
- if (StringUtils.isBlank(debeziumOp)) {
- return null;
- }
- DebeziumJson.Source source = DebeziumJson.Source.builder()
-
.db(to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD))
-
.table(to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD))
- .name("mongo_binlog_source")
- .mysqlType(mysqlType)
- .sqlType(sqlType)
- .pkNames(null)
- .build();
- DebeziumJson debeziumJson = DebeziumJson.builder()
- .source(source)
- .after(field)
- .tsMs((Long)
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY))
- .op(debeziumOp)
- .tableChange(tableSchema)
- .build();
- try {
- return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
- } catch (Exception e) {
- throw new IllegalStateException("exception occurs when
get meta data", e);
- }
+ return getDebeziumData(record, tableSchema, rowData);
}
}),
@@ -164,45 +117,13 @@ public enum MongoDBReadableMetadata {
public Object read(SourceRecord record,
@Nullable TableChanges.TableChange tableSchema,
RowData rowData) {
// construct canal json
- Struct messageStruct = (Struct) record.value();
- Struct to =
messageStruct.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
- Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
- String canalOp = getCanalOpType(rowData);
- if (StringUtils.isBlank(canalOp)) {
- return null;
- }
- GenericRowData data = (GenericRowData) rowData;
- Map<String, Object> field = (Map<String, Object>)
data.getField(0);
- Map<String, String> mysqlType = (Map<String, String>)
data.getField(1);
- Map<String, Integer> sqlType = new HashMap<>();
- mysqlType.forEach((name, value) -> sqlType.put(name,
RecordUtils.getSqlType(value)));
- List<Map<String, Object>> dataList = new ArrayList<>();
- dataList.add(field);
- CanalJson canalJson = CanalJson.builder()
- .data(dataList)
-
.database(to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD))
- .sql("")
- .es((Long)
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY))
- .isDdl(false)
- .pkNames(null)
- .mysqlType(getMysqlType(tableSchema))
-
.table(to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD))
- .ts((Long)
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY))
- .type(canalOp)
- .sqlType(sqlType)
- .build();
- try {
- return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
- } catch (Exception e) {
- throw new IllegalStateException("exception occurs when
get meta data", e);
- }
+ return getCanalData(record, rowData, tableSchema);
}
});
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
MongoDBReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
this.key = key;
@@ -210,63 +131,6 @@ public enum MongoDBReadableMetadata {
this.converter = converter;
}
- public static Map<String, String> getMysqlType(@Nullable
TableChanges.TableChange tableSchema) {
- if (tableSchema == null) {
- return null;
- }
- Map<String, String> mysqlType = new LinkedHashMap<>();
- final Table table = tableSchema.getTable();
- table.columns()
- .forEach(
- column -> {
- mysqlType.put(
- column.name(),
- String.format(
- "%s(%d)",
- column.typeName(),
- column.length()));
- });
- return mysqlType;
- }
-
- private static String getDebeziumOpType(RowData rowData) {
- String opType = null;
- switch (rowData.getRowKind()) {
- case INSERT:
- opType = "c";
- break;
- case DELETE:
- opType = "d";
- break;
- case UPDATE_AFTER:
- case UPDATE_BEFORE:
- opType = "u";
- break;
- default:
- return null;
- }
- return opType;
- }
-
- private static String getCanalOpType(RowData rowData) {
- String opType = null;
- switch (rowData.getRowKind()) {
- case INSERT:
- opType = "INSERT";
- break;
- case DELETE:
- opType = "DELETE";
- break;
- case UPDATE_AFTER:
- case UPDATE_BEFORE:
- opType = "UPDATE";
- break;
- default:
- return null;
- }
- return opType;
- }
-
public String getKey() {
return key;
}