This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e4b0478 [flink] Fix maxwell extractRecords nullPointerException 
(#2904)
69e4b0478 is described below

commit 69e4b04789f0a12b4793f45871d2d4df7f6aa3cc
Author: monster <[email protected]>
AuthorDate: Tue Feb 27 09:58:02 2024 +0800

    [flink] Fix maxwell extractRecords nullPointerException (#2904)
---
 .../flink/action/cdc/format/RecordParser.java      |  3 +-
 .../kafka/KafkaMaxwellSyncTableActionITCase.java   |  6 +++
 .../cdc/kafka/KafkaSyncTableActionITCase.java      | 45 ++++++++++++++++++++++
 .../table/schemaevolution/maxwell-data-4.txt       | 20 ++++++++++
 4 files changed, 73 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 91bc946c8..4dcbb2a81 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -143,6 +143,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
                 JsonSerdeUtil.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
         Map<String, String> rowData =
                 recordMap.entrySet().stream()
+                        .filter(entry -> Objects.nonNull(entry.getKey()))
                         .collect(
                                 Collectors.toMap(
                                         Map.Entry::getKey,
@@ -157,7 +158,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
                                                     return 
Objects.toString(entry.getValue());
                                                 }
                                             }
-                                            return 
Objects.toString(entry.getValue(), null);
+                                            return 
Objects.toString(entry.getValue());
                                         }));
         evalComputedColumns(rowData, paimonFieldTypes);
         return rowData;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
index 8d100111c..f15c06c5e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -85,4 +85,10 @@ public class KafkaMaxwellSyncTableActionITCase extends 
KafkaSyncTableActionITCas
     public void testWaterMarkSyncTable() throws Exception {
         testWaterMarkSyncTable(MAXWELL);
     }
+
+    @Test
+    @Timeout(60)
+    public void testFieldValNullSyncTable() throws Exception {
+        testTableFiledValNull(MAXWELL);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index 47ab426c2..8bc5aaea2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -948,4 +948,49 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
 
         waitForResult(expected, table, rowType, primaryKeys);
     }
+
+    protected void testTableFiledValNull(String format) throws Exception {
+        final String topic = "table_filed_val_null";
+        createTestTopic(topic, 1, 1);
+        // ---------- Write the data into Kafka -------------------
+        List<String> lines =
+                readLines(
+                        String.format(
+                                
"kafka/%s/table/schemaevolution/%s-data-4.txt", format, format));
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception(String.format("Failed to write %s data to 
Kafka.", format), e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+        kafkaConfig.put(SCAN_STARTUP_MODE.key(), EARLIEST_OFFSET.toString());
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withPrimaryKeys("id")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        Thread.sleep(5000);
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        // topic has four records we read two
+        List<String> expected =
+                Arrays.asList(
+                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, null]",
+                        "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-4.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-4.txt
new file mode 100644
index 000000000..8d241ba19
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/maxwell/table/schemaevolution/maxwell-data-4.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack
 drill bits","description":"12-pack of drill bits with sizes ranging from #40 
to #3","weight":null},"primary_key_columns": ["id"]}
+{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz
 carpenter's hammer","weight":0.75},"primary_key_columns": ["id"]}

Reply via email to