This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dfcf1e1e7 [cdc] Fix that kafka message value might be null which 
causes NPE (#3113)
dfcf1e1e7 is described below

commit dfcf1e1e72a46c73adf2f789ed97bd2293a32637
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 29 14:58:05 2024 +0800

    [cdc] Fix that kafka message value might be null which causes NPE (#3113)
---
 .../flink/action/cdc/kafka/KafkaActionUtils.java   |  9 ++-
 ...KafkaValueOnlyDeserializationSchemaWrapper.java | 71 ++++++++++++++++++++++
 .../kafka/KafkaDebeziumSyncTableActionITCase.java  | 59 ++++++++++++++++++
 .../debezium/table/nullvalue/debezium-data-1.txt   | 19 ++++++
 .../debezium/table/nullvalue/debezium-data-2.txt   | 19 ++++++
 5 files changed, 174 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index fcd0eeb88..64a543beb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -83,9 +83,12 @@ public class KafkaActionUtils {
                     
Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
         }
 
-        kafkaSourceBuilder
-                .setValueOnlyDeserializer(new SimpleStringSchema())
-                .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+        KafkaValueOnlyDeserializationSchemaWrapper<String> schema =
+                new KafkaValueOnlyDeserializationSchemaWrapper<>(new 
SimpleStringSchema());
+        kafkaSourceBuilder.setDeserializer(schema);
+
+        kafkaSourceBuilder.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+
         Properties properties = createKafkaProperties(kafkaConfig);
 
         StartupMode startupMode =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
new file mode 100644
index 000000000..5e6b96670
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A class that wraps a {@link DeserializationSchema} as the value 
deserializer for a {@link
+ * ConsumerRecord}.
+ *
+ * @param <T> the return type of the deserialization.
+ */
+class KafkaValueOnlyDeserializationSchemaWrapper<T> implements 
KafkaRecordDeserializationSchema<T> {
+    private static final long serialVersionUID = 1L;
+    private final DeserializationSchema<T> deserializationSchema;
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class);
+
+    KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T> 
deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
+        deserializationSchema.open(context);
+    }
+
+    @Override
+    public void deserialize(ConsumerRecord<byte[], byte[]> message, 
Collector<T> out)
+            throws IOException {
+        if (message.value() != null) {
+            deserializationSchema.deserialize(message.value(), out);
+        } else {
+            // see
+            // 
https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events
+            LOG.info(
+                    "Found null message value:\n{}\nThis message will be 
ignored. It might be produced by tombstone-event, "
+                            + "please check your Debezium and Kafka 
configuration.",
+                    message);
+        }
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 7aba174d3..ba9633162 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -18,9 +18,24 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+
 /** IT cases for {@link KafkaSyncTableAction}. */
 public class KafkaDebeziumSyncTableActionITCase extends 
KafkaSyncTableActionITCase {
 
@@ -103,4 +118,48 @@ public class KafkaDebeziumSyncTableActionITCase extends 
KafkaSyncTableActionITCa
     public void testAllTypesWithSchema() throws Exception {
         testAllTypesWithSchemaImpl(DEBEZIUM);
     }
+
+    @Test
+    @Timeout(60)
+    public void testMessageWithNullValue() throws Exception {
+        final String topic = "test_null_value";
+        createTestTopic(topic, 1, 1);
+
+        List<String> lines = 
readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt");
+        writeRecordsToKafka(topic, lines);
+
+        // write null value
+        Properties producerProperties = getStandardProps();
+        producerProperties.setProperty("retries", "0");
+        producerProperties.put(
+                "key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProperties.put(
+                "value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(producerProperties);
+        kafkaProducer.send(new ProducerRecord<>(topic, null));
+        kafkaProducer.close();
+
+        lines = 
readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt");
+        writeRecordsToKafka(topic, lines);
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.STRING().notNull(), 
DataTypes.STRING()},
+                        new String[] {"id", "value"});
+        waitForResult(
+                Arrays.asList("+I[1, A]", "+I[2, B]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Collections.singletonList("id"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt
new file mode 100644
index 000000000..fda10c2c1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 1, "value": "A"}, "source": {"version": 
"1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 
1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": 
"test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": 
null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt
new file mode 100644
index 000000000..d8de79c29
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 2, "value": "B"}, "source": {"version": 
"1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 
1596684884000, "snapshot": "false", "db": "test", "sequence": null, "table": 
"test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": 
null, "query": null}, "op": "c", "ts_ms": 1596684884000, "transaction": null}

Reply via email to