This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e26f301e [flink] Supports debezium-bson formats of kafka data which 
collected from mongodb via debezium (#4870)
37e26f301e is described below

commit 37e26f301e91d23a8df961e65cb505b7781af5a7
Author: chuangchuang <[email protected]>
AuthorDate: Fri Jan 10 19:10:27 2025 +0800

    [flink] Supports debezium-bson formats of kafka data which collected from 
mongodb via debezium (#4870)
---
 paimon-flink/paimon-flink-cdc/pom.xml              |   8 +
 .../format/debezium/DebeziumBsonDataFormat.java    |  34 +++
 .../debezium/DebeziumBsonDataFormatFactory.java    |  38 ++++
 .../format/debezium/DebeziumBsonRecordParser.java  | 131 +++++++++++
 .../format/debezium/DebeziumJsonRecordParser.java  |   2 +-
 .../action/cdc/mongodb/BsonValueConvertor.java     | 242 +++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 7 files changed, 455 insertions(+), 1 deletion(-)

diff --git a/paimon-flink/paimon-flink-cdc/pom.xml 
b/paimon-flink/paimon-flink-cdc/pom.xml
index bb7c0d8655..6d4ecb3c47 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -44,6 +44,7 @@ under the License.
         
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
         <confluent.platform.version>7.5.0</confluent.platform.version>
         
<flink.connector.kafka.version>3.3.0-1.20</flink.connector.kafka.version>
+        <bson.version>5.2.1</bson.version>
     </properties>
 
     <repositories>
@@ -165,6 +166,13 @@ under the License.
             <version>${json-path.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>${bson.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
new file mode 100644
index 0000000000..e9b6a573f0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
+
+/**
+ * Supports the message queue's debezium bson data format and provides 
definitions for the message
+ * queue's record json deserialization class and parsing class {@link 
DebeziumBsonRecordParser}.
+ */
+public class DebeziumBsonDataFormat extends AbstractJsonDataFormat {
+
+    @Override
+    protected RecordParserFactory parser() {
+        return DebeziumBsonRecordParser::new;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
new file mode 100644
index 0000000000..2d55cb9832
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
+
+/** Factory to create {@link DebeziumBsonDataFormat}. */
+public class DebeziumBsonDataFormatFactory implements DataFormatFactory {
+
+    public static final String IDENTIFIER = "debezium-bson";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public DataFormat create() {
+        return new DebeziumBsonDataFormat();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
new file mode 100644
index 0000000000..10dcd56b80
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TypeUtils;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.bson.BsonDocument;
+import org.bson.BsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
+import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
+
+/**
+ * The {@code DebeziumRecordParser} class extends the {@link 
DebeziumJsonRecordParser} and is
+ * designed to parse records from MongoDB's BSON change data capture (CDC) 
format via Debezium.
+ *
+ * <p>The class supports The following features:
+ *
+ * <p>Convert bson string to java object from before/after field
+ *
+ * <p>Parse the schema from bson, all fields are string type, and the _id 
field is the primary key
+ */
+public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumBsonRecordParser.class);
+
+    private static final String FIELD_COLLECTION = "collection";
+    private static final String FIELD_OBJECT_ID = "_id";
+    private static final List<String> PRIMARY_KEYS = 
Collections.singletonList(FIELD_OBJECT_ID);
+
+    public DebeziumBsonRecordParser(TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
+        super(typeMapping, computedColumns);
+    }
+
+    @Override
+    protected void setRoot(CdcSourceRecord record) {
+        JsonNode node = (JsonNode) record.getValue();
+        if (node.has(FIELD_SCHEMA)) {
+            root = node.get(FIELD_PAYLOAD);
+        } else {
+            root = node;
+        }
+    }
+
+    @Override
+    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+        // bson record should be a string
+        Preconditions.checkArgument(
+                record.isTextual(),
+                "debezium bson record expected to be STRING type, but actual 
is %s",
+                record.getNodeType());
+
+        BsonDocument document = BsonDocument.parse(record.asText());
+        LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
+        for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
+            String fieldName = entry.getKey();
+            resultMap.put(fieldName, 
toJsonString(BsonValueConvertor.convert(entry.getValue())));
+            rowTypeBuilder.field(fieldName, DataTypes.STRING());
+        }
+
+        evalComputedColumns(resultMap, rowTypeBuilder);
+
+        return resultMap;
+    }
+
+    private static String toJsonString(Object entry) {
+        if (entry == null) {
+            return null;
+        } else if (!TypeUtils.isBasicType(entry)) {
+            try {
+                return writeValueAsString(entry);
+            } catch (JsonProcessingException e) {
+                LOG.error("Failed to deserialize record.", e);
+            }
+        }
+        return Objects.toString(entry);
+    }
+
+    @Override
+    protected List<String> extractPrimaryKeys() {
+        return PRIMARY_KEYS;
+    }
+
+    @Nullable
+    @Override
+    protected String getTableName() {
+        return getFromSourceField(FIELD_COLLECTION);
+    }
+
+    @Override
+    protected String format() {
+        return "debezium-bson";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 6349fc6a95..66ddc2e1ca 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -244,7 +244,7 @@ public class DebeziumJsonRecordParser extends 
AbstractJsonRecordParser {
     }
 
     @Nullable
-    private String getFromSourceField(String key) {
+    protected String getFromSourceField(String key) {
         JsonNode node = root.get(FIELD_SOURCE);
         if (isNull(node)) {
             return null;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
new file mode 100644
index 0000000000..16d43821bd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.util.CollectionUtil;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDbPointer;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonJavaScript;
+import org.bson.BsonJavaScriptWithScope;
+import org.bson.BsonMaxKey;
+import org.bson.BsonMinKey;
+import org.bson.BsonNull;
+import org.bson.BsonObjectId;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonString;
+import org.bson.BsonSymbol;
+import org.bson.BsonTimestamp;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@code BsonValueConvertor} class is designed to convert {@link 
BsonValue} to Java objects.
+ */
+public class BsonValueConvertor {
+    private static Integer convert(BsonTimestamp bsonTimestamp) {
+        return bsonTimestamp.getTime();
+    }
+
+    private static BigDecimal convert(BsonDecimal128 bsonValue) {
+        return convert(bsonValue.decimal128Value());
+    }
+
+    private static BigDecimal convert(Decimal128 bsonValue) {
+        if (bsonValue.isNaN() || bsonValue.isInfinite()) {
+            return null;
+        }
+        return bsonValue.bigDecimalValue();
+    }
+
+    private static String convert(BsonObjectId objectId) {
+        return convert(objectId.getValue());
+    }
+
+    private static String convert(ObjectId objectId) {
+        return objectId.toHexString();
+    }
+
+    private static String convert(BsonBinary bsonBinary) {
+        if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
+            return bsonBinary.asUuid().toString();
+        } else {
+            return toHex(bsonBinary.getData());
+        }
+    }
+
+    private static String convert(BsonUndefined bsonUndefined) {
+        return null;
+    }
+
+    private static String convert(BsonRegularExpression regex) {
+        String escaped =
+                regex.getPattern().isEmpty() ? "(?:)" : 
regex.getPattern().replace("/", "\\/");
+        return String.format("/%s/%s", escaped, regex.getOptions());
+    }
+
+    private static Double convert(BsonDouble bsonDouble) {
+        double value = bsonDouble.getValue();
+        if (Double.isNaN(value) || Double.isInfinite(value)) {
+            return null;
+        }
+        return value;
+    }
+
+    private static String convert(BsonString string) {
+        return string.getValue();
+    }
+
+    private static Integer convert(BsonInt32 int32) {
+        return int32.getValue();
+    }
+
+    private static Long convert(BsonInt64 int64) {
+        return int64.getValue();
+    }
+
+    private static Boolean convert(BsonBoolean bool) {
+        return bool.getValue();
+    }
+
+    private static Long convert(BsonDateTime dateTime) {
+        return dateTime.getValue();
+    }
+
+    private static String convert(BsonSymbol symbol) {
+        return symbol.getSymbol();
+    }
+
+    private static String convert(BsonJavaScript javascript) {
+        return javascript.getCode();
+    }
+
+    private static Map<String, Object> convert(BsonJavaScriptWithScope 
javascriptWithScope) {
+        return CollectionUtil.map(
+                Pair.of("code", javascriptWithScope.getCode()),
+                Pair.of("scope", convert(javascriptWithScope.getScope())));
+    }
+
+    private static String convert(BsonNull bsonNull) {
+        return null;
+    }
+
+    private static String convert(BsonDbPointer dbPointer) {
+        return dbPointer.toString();
+    }
+
+    private static String convert(BsonMaxKey maxKey) {
+        return maxKey.toString();
+    }
+
+    private static String convert(BsonMinKey minKey) {
+        return minKey.toString();
+    }
+
+    private static Map<String, Object> convert(BsonDocument document) {
+        LinkedHashMap<String, Object> map = new 
LinkedHashMap<>(document.size());
+        for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
+            map.put(entry.getKey(), convert(entry.getValue()));
+        }
+        return map;
+    }
+
+    private static List<Object> convert(BsonArray array) {
+        ArrayList<Object> objects = new ArrayList<>(array.size());
+        for (BsonValue bsonValue : array) {
+            objects.add(convert(bsonValue));
+        }
+        return objects;
+    }
+
+    public static Object convert(BsonValue bsonValue) {
+        if (bsonValue == null) {
+            return null;
+        }
+        switch (bsonValue.getBsonType()) {
+            case TIMESTAMP:
+                return convert(bsonValue.asTimestamp());
+            case DECIMAL128:
+                return convert(bsonValue.asDecimal128());
+            case OBJECT_ID:
+                return convert(bsonValue.asObjectId());
+            case BINARY:
+                return convert(bsonValue.asBinary());
+            case UNDEFINED:
+                return convert((BsonUndefined) bsonValue);
+            case REGULAR_EXPRESSION:
+                return convert(bsonValue.asRegularExpression());
+            case DOUBLE:
+                return convert(bsonValue.asDouble());
+            case STRING:
+                return convert(bsonValue.asString());
+            case INT32:
+                return convert(bsonValue.asInt32());
+            case INT64:
+                return convert(bsonValue.asInt64());
+            case BOOLEAN:
+                return convert(bsonValue.asBoolean());
+            case DATE_TIME:
+                return convert(bsonValue.asDateTime());
+            case SYMBOL:
+                return convert(bsonValue.asSymbol());
+            case JAVASCRIPT:
+                return convert(bsonValue.asJavaScript());
+            case JAVASCRIPT_WITH_SCOPE:
+                return convert(bsonValue.asJavaScriptWithScope());
+            case NULL:
+                return convert((BsonNull) bsonValue);
+            case DB_POINTER:
+                return convert(bsonValue.asDBPointer());
+            case MAX_KEY:
+                return convert((BsonMaxKey) bsonValue);
+            case MIN_KEY:
+                return convert((BsonMinKey) bsonValue);
+            case DOCUMENT:
+                return convert(bsonValue.asDocument());
+            case ARRAY:
+                return convert(bsonValue.asArray());
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported BSON type: " + bsonValue.getBsonType());
+        }
+    }
+
+    public static String toHex(byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+
+        for (byte b : bytes) {
+            String s = Integer.toHexString(255 & b);
+            if (s.length() < 2) {
+                sb.append("0");
+            }
+
+            sb.append(s);
+        }
+
+        return sb.toString();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1b30c7ab63..1f44ef5cdd 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -31,6 +31,7 @@ 
org.apache.paimon.flink.action.cdc.format.aliyun.AliyunDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.canal.CanalDataFormatFactory
 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroDataFormatFactory
 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
+org.apache.paimon.flink.action.cdc.format.debezium.DebeziumBsonDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.json.JsonDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellDataFormatFactory
 org.apache.paimon.flink.action.cdc.format.ogg.OggDataFormatFactory

Reply via email to