This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4defb60e3 [flink] Fix numberFormatException thrown by CDC writing 
empty partition data (#1978)
4defb60e3 is described below

commit 4defb60e38db966031d270905e4b220066394b66
Author: ehui <[email protected]>
AuthorDate: Mon Sep 11 10:05:49 2023 +0800

    [flink] Fix numberFormatException thrown by CDC writing empty partition 
data (#1978)
---
 .../paimon/flink/sink/cdc/CdcRecordUtils.java      | 10 ++++---
 .../cdc/CdcRecordKeyAndBucketExtractorTest.java    | 34 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index 1c9322b1d..769c41a19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.StringUtils;
 import org.apache.paimon.utils.TypeUtils;
 
 import org.slf4j.Logger;
@@ -54,10 +55,11 @@ public class CdcRecordUtils {
         GenericRow genericRow = new GenericRow(dataFields.size());
         for (int i = 0; i < dataFields.size(); i++) {
             DataField dataField = dataFields.get(i);
-            genericRow.setField(
-                    i,
-                    TypeUtils.castFromCdcValueString(
-                            record.fields().get(dataField.name()), 
dataField.type()));
+            String fieldValue = record.fields().get(dataField.name());
+            if (!StringUtils.isEmpty(fieldValue)) {
+                genericRow.setField(
+                        i, TypeUtils.castFromCdcValueString(fieldValue, 
dataField.type()));
+            }
         }
         return genericRow;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
index bdf63ea77..4ec0215ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -104,6 +104,40 @@ public class CdcRecordKeyAndBucketExtractorTest {
         }
     }
 
+    @Test
+    public void testNullPartition() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableSchema schema = createTableSchema();
+        RowDataKeyAndBucketExtractor expected = new 
RowDataKeyAndBucketExtractor(schema);
+        CdcRecordKeyAndBucketExtractor actual = new 
CdcRecordKeyAndBucketExtractor(schema);
+
+        long k1 = random.nextLong();
+        int v1 = random.nextInt();
+        String k2 = UUID.randomUUID().toString();
+        String v2 = UUID.randomUUID().toString();
+
+        GenericRowData rowData =
+                GenericRowData.of(
+                        null, null, k1, v1, StringData.fromString(k2), 
StringData.fromString(v2));
+        expected.setRecord(rowData);
+
+        Map<String, String> fields = new HashMap<>();
+        fields.put("pt1", null);
+        fields.put("pt2", null);
+        fields.put("k1", String.valueOf(k1));
+        fields.put("v1", String.valueOf(v1));
+        fields.put("k2", k2);
+        fields.put("v2", v2);
+
+        actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+        assertThat(actual.partition()).isEqualTo(expected.partition());
+        assertThat(actual.bucket()).isEqualTo(expected.bucket());
+
+        actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+        assertThat(actual.partition()).isEqualTo(expected.partition());
+        assertThat(actual.bucket()).isEqualTo(expected.bucket());
+    }
+
     private TableSchema createTableSchema() throws Exception {
         return SchemaUtils.forceCommit(
                 new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString())),

Reply via email to