This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 37e26f301e [flink] Supports debezium-bson formats of kafka data which
collected from mongodb via debezium (#4870)
37e26f301e is described below
commit 37e26f301e91d23a8df961e65cb505b7781af5a7
Author: chuangchuang <[email protected]>
AuthorDate: Fri Jan 10 19:10:27 2025 +0800
[flink] Supports debezium-bson formats of kafka data which collected from
mongodb via debezium (#4870)
---
paimon-flink/paimon-flink-cdc/pom.xml | 8 +
.../format/debezium/DebeziumBsonDataFormat.java | 34 +++
.../debezium/DebeziumBsonDataFormatFactory.java | 38 ++++
.../format/debezium/DebeziumBsonRecordParser.java | 131 +++++++++++
.../format/debezium/DebeziumJsonRecordParser.java | 2 +-
.../action/cdc/mongodb/BsonValueConvertor.java | 242 +++++++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
7 files changed, 455 insertions(+), 1 deletion(-)
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml
b/paimon-flink/paimon-flink-cdc/pom.xml
index bb7c0d8655..6d4ecb3c47 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -44,6 +44,7 @@ under the License.
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<confluent.platform.version>7.5.0</confluent.platform.version>
<flink.connector.kafka.version>3.3.0-1.20</flink.connector.kafka.version>
+ <bson.version>5.2.1</bson.version>
</properties>
<repositories>
@@ -165,6 +166,13 @@ under the License.
<version>${json-path.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>bson</artifactId>
+ <version>${bson.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
new file mode 100644
index 0000000000..e9b6a573f0
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
+
+/**
+ * Supports the message queue's debezium bson data format and provides
definitions for the message
+ * queue's record json deserialization class and parsing class {@link
DebeziumBsonRecordParser}.
+ */
+public class DebeziumBsonDataFormat extends AbstractJsonDataFormat {
+
+ @Override
+ protected RecordParserFactory parser() {
+ return DebeziumBsonRecordParser::new;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
new file mode 100644
index 0000000000..2d55cb9832
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
+
+/** Factory to create {@link DebeziumBsonDataFormat}. */
+public class DebeziumBsonDataFormatFactory implements DataFormatFactory {
+
+ public static final String IDENTIFIER = "debezium-bson";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DataFormat create() {
+ return new DebeziumBsonDataFormat();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
new file mode 100644
index 0000000000..10dcd56b80
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TypeUtils;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.bson.BsonDocument;
+import org.bson.BsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
+import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
+
+/**
+ * The {@code DebeziumRecordParser} class extends the {@link
DebeziumJsonRecordParser} and is
+ * designed to parse records from MongoDB's BSON change data capture (CDC)
format via Debezium.
+ *
+ * <p>The class supports The following features:
+ *
+ * <p>Convert bson string to java object from before/after field
+ *
+ * <p>Parse the schema from bson, all fields are string type, and the _id
field is the primary key
+ */
+public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebeziumBsonRecordParser.class);
+
+ private static final String FIELD_COLLECTION = "collection";
+ private static final String FIELD_OBJECT_ID = "_id";
+ private static final List<String> PRIMARY_KEYS =
Collections.singletonList(FIELD_OBJECT_ID);
+
+ public DebeziumBsonRecordParser(TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
+ super(typeMapping, computedColumns);
+ }
+
+ @Override
+ protected void setRoot(CdcSourceRecord record) {
+ JsonNode node = (JsonNode) record.getValue();
+ if (node.has(FIELD_SCHEMA)) {
+ root = node.get(FIELD_PAYLOAD);
+ } else {
+ root = node;
+ }
+ }
+
+ @Override
+ protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+ // bson record should be a string
+ Preconditions.checkArgument(
+ record.isTextual(),
+ "debezium bson record expected to be STRING type, but actual
is %s",
+ record.getNodeType());
+
+ BsonDocument document = BsonDocument.parse(record.asText());
+ LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
+ for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
+ String fieldName = entry.getKey();
+ resultMap.put(fieldName,
toJsonString(BsonValueConvertor.convert(entry.getValue())));
+ rowTypeBuilder.field(fieldName, DataTypes.STRING());
+ }
+
+ evalComputedColumns(resultMap, rowTypeBuilder);
+
+ return resultMap;
+ }
+
+ private static String toJsonString(Object entry) {
+ if (entry == null) {
+ return null;
+ } else if (!TypeUtils.isBasicType(entry)) {
+ try {
+ return writeValueAsString(entry);
+ } catch (JsonProcessingException e) {
+ LOG.error("Failed to deserialize record.", e);
+ }
+ }
+ return Objects.toString(entry);
+ }
+
+ @Override
+ protected List<String> extractPrimaryKeys() {
+ return PRIMARY_KEYS;
+ }
+
+ @Nullable
+ @Override
+ protected String getTableName() {
+ return getFromSourceField(FIELD_COLLECTION);
+ }
+
+ @Override
+ protected String format() {
+ return "debezium-bson";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 6349fc6a95..66ddc2e1ca 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -244,7 +244,7 @@ public class DebeziumJsonRecordParser extends
AbstractJsonRecordParser {
}
@Nullable
- private String getFromSourceField(String key) {
+ protected String getFromSourceField(String key) {
JsonNode node = root.get(FIELD_SOURCE);
if (isNull(node)) {
return null;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
new file mode 100644
index 0000000000..16d43821bd
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.util.CollectionUtil;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDbPointer;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonJavaScript;
+import org.bson.BsonJavaScriptWithScope;
+import org.bson.BsonMaxKey;
+import org.bson.BsonMinKey;
+import org.bson.BsonNull;
+import org.bson.BsonObjectId;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonString;
+import org.bson.BsonSymbol;
+import org.bson.BsonTimestamp;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@code BsonValueConvertor} class is designed to convert {@link
BsonValue} to Java objects.
+ */
+public class BsonValueConvertor {
+ private static Integer convert(BsonTimestamp bsonTimestamp) {
+ return bsonTimestamp.getTime();
+ }
+
+ private static BigDecimal convert(BsonDecimal128 bsonValue) {
+ return convert(bsonValue.decimal128Value());
+ }
+
+ private static BigDecimal convert(Decimal128 bsonValue) {
+ if (bsonValue.isNaN() || bsonValue.isInfinite()) {
+ return null;
+ }
+ return bsonValue.bigDecimalValue();
+ }
+
+ private static String convert(BsonObjectId objectId) {
+ return convert(objectId.getValue());
+ }
+
+ private static String convert(ObjectId objectId) {
+ return objectId.toHexString();
+ }
+
+ private static String convert(BsonBinary bsonBinary) {
+ if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
+ return bsonBinary.asUuid().toString();
+ } else {
+ return toHex(bsonBinary.getData());
+ }
+ }
+
+ private static String convert(BsonUndefined bsonUndefined) {
+ return null;
+ }
+
+ private static String convert(BsonRegularExpression regex) {
+ String escaped =
+ regex.getPattern().isEmpty() ? "(?:)" :
regex.getPattern().replace("/", "\\/");
+ return String.format("/%s/%s", escaped, regex.getOptions());
+ }
+
+ private static Double convert(BsonDouble bsonDouble) {
+ double value = bsonDouble.getValue();
+ if (Double.isNaN(value) || Double.isInfinite(value)) {
+ return null;
+ }
+ return value;
+ }
+
+ private static String convert(BsonString string) {
+ return string.getValue();
+ }
+
+ private static Integer convert(BsonInt32 int32) {
+ return int32.getValue();
+ }
+
+ private static Long convert(BsonInt64 int64) {
+ return int64.getValue();
+ }
+
+ private static Boolean convert(BsonBoolean bool) {
+ return bool.getValue();
+ }
+
+ private static Long convert(BsonDateTime dateTime) {
+ return dateTime.getValue();
+ }
+
+ private static String convert(BsonSymbol symbol) {
+ return symbol.getSymbol();
+ }
+
+ private static String convert(BsonJavaScript javascript) {
+ return javascript.getCode();
+ }
+
+ private static Map<String, Object> convert(BsonJavaScriptWithScope
javascriptWithScope) {
+ return CollectionUtil.map(
+ Pair.of("code", javascriptWithScope.getCode()),
+ Pair.of("scope", convert(javascriptWithScope.getScope())));
+ }
+
+ private static String convert(BsonNull bsonNull) {
+ return null;
+ }
+
+ private static String convert(BsonDbPointer dbPointer) {
+ return dbPointer.toString();
+ }
+
+ private static String convert(BsonMaxKey maxKey) {
+ return maxKey.toString();
+ }
+
+ private static String convert(BsonMinKey minKey) {
+ return minKey.toString();
+ }
+
+ private static Map<String, Object> convert(BsonDocument document) {
+ LinkedHashMap<String, Object> map = new
LinkedHashMap<>(document.size());
+ for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
+ map.put(entry.getKey(), convert(entry.getValue()));
+ }
+ return map;
+ }
+
+ private static List<Object> convert(BsonArray array) {
+ ArrayList<Object> objects = new ArrayList<>(array.size());
+ for (BsonValue bsonValue : array) {
+ objects.add(convert(bsonValue));
+ }
+ return objects;
+ }
+
+ public static Object convert(BsonValue bsonValue) {
+ if (bsonValue == null) {
+ return null;
+ }
+ switch (bsonValue.getBsonType()) {
+ case TIMESTAMP:
+ return convert(bsonValue.asTimestamp());
+ case DECIMAL128:
+ return convert(bsonValue.asDecimal128());
+ case OBJECT_ID:
+ return convert(bsonValue.asObjectId());
+ case BINARY:
+ return convert(bsonValue.asBinary());
+ case UNDEFINED:
+ return convert((BsonUndefined) bsonValue);
+ case REGULAR_EXPRESSION:
+ return convert(bsonValue.asRegularExpression());
+ case DOUBLE:
+ return convert(bsonValue.asDouble());
+ case STRING:
+ return convert(bsonValue.asString());
+ case INT32:
+ return convert(bsonValue.asInt32());
+ case INT64:
+ return convert(bsonValue.asInt64());
+ case BOOLEAN:
+ return convert(bsonValue.asBoolean());
+ case DATE_TIME:
+ return convert(bsonValue.asDateTime());
+ case SYMBOL:
+ return convert(bsonValue.asSymbol());
+ case JAVASCRIPT:
+ return convert(bsonValue.asJavaScript());
+ case JAVASCRIPT_WITH_SCOPE:
+ return convert(bsonValue.asJavaScriptWithScope());
+ case NULL:
+ return convert((BsonNull) bsonValue);
+ case DB_POINTER:
+ return convert(bsonValue.asDBPointer());
+ case MAX_KEY:
+ return convert((BsonMaxKey) bsonValue);
+ case MIN_KEY:
+ return convert((BsonMinKey) bsonValue);
+ case DOCUMENT:
+ return convert(bsonValue.asDocument());
+ case ARRAY:
+ return convert(bsonValue.asArray());
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported BSON type: " + bsonValue.getBsonType());
+ }
+ }
+
+ public static String toHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+
+ for (byte b : bytes) {
+ String s = Integer.toHexString(255 & b);
+ if (s.length() < 2) {
+ sb.append("0");
+ }
+
+ sb.append(s);
+ }
+
+ return sb.toString();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1b30c7ab63..1f44ef5cdd 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -31,6 +31,7 @@
org.apache.paimon.flink.action.cdc.format.aliyun.AliyunDataFormatFactory
org.apache.paimon.flink.action.cdc.format.canal.CanalDataFormatFactory
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroDataFormatFactory
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
+org.apache.paimon.flink.action.cdc.format.debezium.DebeziumBsonDataFormatFactory
org.apache.paimon.flink.action.cdc.format.json.JsonDataFormatFactory
org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellDataFormatFactory
org.apache.paimon.flink.action.cdc.format.ogg.OggDataFormatFactory