This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 69e4b0478 [flink] Fix maxwell extractRecords nullPointerException
(#2904)
69e4b0478 is described below
commit 69e4b04789f0a12b4793f45871d2d4df7f6aa3cc
Author: monster <[email protected]>
AuthorDate: Tue Feb 27 09:58:02 2024 +0800
[flink] Fix maxwell extractRecords nullPointerException (#2904)
---
.../flink/action/cdc/format/RecordParser.java | 3 +-
.../kafka/KafkaMaxwellSyncTableActionITCase.java | 6 +++
.../cdc/kafka/KafkaSyncTableActionITCase.java | 45 ++++++++++++++++++++++
.../table/schemaevolution/maxwell-data-4.txt | 20 ++++++++++
4 files changed, 73 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 91bc946c8..4dcbb2a81 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -143,6 +143,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
JsonSerdeUtil.convertValue(record, new
TypeReference<Map<String, Object>>() {});
Map<String, String> rowData =
recordMap.entrySet().stream()
+ .filter(entry -> Objects.nonNull(entry.getKey()))
.collect(
Collectors.toMap(
Map.Entry::getKey,
@@ -157,7 +158,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
return
Objects.toString(entry.getValue());
}
}
- return
Objects.toString(entry.getValue(), null);
+ return
Objects.toString(entry.getValue());
}));
evalComputedColumns(rowData, paimonFieldTypes);
return rowData;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
index 8d100111c..f15c06c5e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -85,4 +85,10 @@ public class KafkaMaxwellSyncTableActionITCase extends
KafkaSyncTableActionITCas
public void testWaterMarkSyncTable() throws Exception {
testWaterMarkSyncTable(MAXWELL);
}
+
+ @Test
+ @Timeout(60)
+ public void testFieldValNullSyncTable() throws Exception {
+ testTableFiledValNull(MAXWELL);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index 47ab426c2..8bc5aaea2 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -948,4 +948,49 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
waitForResult(expected, table, rowType, primaryKeys);
}
+
+ protected void testTableFiledValNull(String format) throws Exception {
+ final String topic = "table_filed_val_null";
+ createTestTopic(topic, 1, 1);
+ // ---------- Write the data into Kafka -------------------
+ List<String> lines =
+ readLines(
+ String.format(
+
"kafka/%s/table/schemaevolution/%s-data-4.txt", format, format));
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception(String.format("Failed to write %s data to
Kafka.", format), e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), EARLIEST_OFFSET.toString());
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withPrimaryKeys("id")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ Thread.sleep(5000);
+ FileStoreTable table = getFileStoreTable(tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "description", "weight"});
+ List<String> primaryKeys = Collections.singletonList("id");
+ // topic has four records we read two
+ List<String> expected =
+ Arrays.asList(
+ "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, null]",
+ "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-4.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-4.txt
new file mode 100644
index 000000000..8d241ba19
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-4.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
drill bits","description":"12-pack of drill bits with sizes ranging from #40
to #3","weight":null},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}