This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dfcf1e1e7 [cdc] Fix that kafka message value might be null which
causes NPE (#3113)
dfcf1e1e7 is described below
commit dfcf1e1e72a46c73adf2f789ed97bd2293a32637
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 29 14:58:05 2024 +0800
[cdc] Fix that kafka message value might be null which causes NPE (#3113)
---
.../flink/action/cdc/kafka/KafkaActionUtils.java | 9 ++-
...KafkaValueOnlyDeserializationSchemaWrapper.java | 71 ++++++++++++++++++++++
.../kafka/KafkaDebeziumSyncTableActionITCase.java | 59 ++++++++++++++++++
.../debezium/table/nullvalue/debezium-data-1.txt | 19 ++++++
.../debezium/table/nullvalue/debezium-data-2.txt | 19 ++++++
5 files changed, 174 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index fcd0eeb88..64a543beb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -83,9 +83,12 @@ public class KafkaActionUtils {
Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
}
- kafkaSourceBuilder
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+ KafkaValueOnlyDeserializationSchemaWrapper<String> schema =
+ new KafkaValueOnlyDeserializationSchemaWrapper<>(new
SimpleStringSchema());
+ kafkaSourceBuilder.setDeserializer(schema);
+
+ kafkaSourceBuilder.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+
Properties properties = createKafkaProperties(kafkaConfig);
StartupMode startupMode =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
new file mode 100644
index 000000000..5e6b96670
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A class that wraps a {@link DeserializationSchema} as the value
deserializer for a {@link
+ * ConsumerRecord}.
+ *
+ * @param <T> the return type of the deserialization.
+ */
+class KafkaValueOnlyDeserializationSchemaWrapper<T> implements
KafkaRecordDeserializationSchema<T> {
+ private static final long serialVersionUID = 1L;
+ private final DeserializationSchema<T> deserializationSchema;
+ private static final Logger LOG =
+
LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class);
+
+ KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T>
deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public void open(DeserializationSchema.InitializationContext context)
throws Exception {
+ deserializationSchema.open(context);
+ }
+
+ @Override
+ public void deserialize(ConsumerRecord<byte[], byte[]> message,
Collector<T> out)
+ throws IOException {
+ if (message.value() != null) {
+ deserializationSchema.deserialize(message.value(), out);
+ } else {
+ // see
+ //
https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events
+ LOG.info(
+ "Found null message value:\n{}\nThis message will be
ignored. It might be produced by tombstone-event, "
+ + "please check your Debezium and Kafka
configuration.",
+ message);
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 7aba174d3..ba9633162 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -18,9 +18,24 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+
/** IT cases for {@link KafkaSyncTableAction}. */
public class KafkaDebeziumSyncTableActionITCase extends
KafkaSyncTableActionITCase {
@@ -103,4 +118,48 @@ public class KafkaDebeziumSyncTableActionITCase extends
KafkaSyncTableActionITCa
public void testAllTypesWithSchema() throws Exception {
testAllTypesWithSchemaImpl(DEBEZIUM);
}
+
+ @Test
+ @Timeout(60)
+ public void testMessageWithNullValue() throws Exception {
+ final String topic = "test_null_value";
+ createTestTopic(topic, 1, 1);
+
+ List<String> lines =
readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt");
+ writeRecordsToKafka(topic, lines);
+
+ // write null value
+ Properties producerProperties = getStandardProps();
+ producerProperties.setProperty("retries", "0");
+ producerProperties.put(
+ "key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ producerProperties.put(
+ "value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(producerProperties);
+ kafkaProducer.send(new ProducerRecord<>(topic, null));
+ kafkaProducer.close();
+
+ lines =
readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt");
+ writeRecordsToKafka(topic, lines);
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.STRING().notNull(),
DataTypes.STRING()},
+ new String[] {"id", "value"});
+ waitForResult(
+ Arrays.asList("+I[1, A]", "+I[2, B]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.singletonList("id"));
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt
new file mode 100644
index 000000000..fda10c2c1
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 1, "value": "A"}, "source": {"version":
"1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms":
1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table":
"test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread":
null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt
new file mode 100644
index 000000000..d8de79c29
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before": null, "after": {"id": 2, "value": "B"}, "source": {"version":
"1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms":
1596684884000, "snapshot": "false", "db": "test", "sequence": null, "table":
"test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread":
null, "query": null}, "op": "c", "ts_ms": 1596684884000, "transaction": null}