This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4defb60e3 [flink] Fix numberFormatException thrown by CDC writing
empty partition data (#1978)
4defb60e3 is described below
commit 4defb60e38db966031d270905e4b220066394b66
Author: ehui <[email protected]>
AuthorDate: Mon Sep 11 10:05:49 2023 +0800
[flink] Fix numberFormatException thrown by CDC writing empty partition
data (#1978)
---
.../paimon/flink/sink/cdc/CdcRecordUtils.java | 10 ++++---
.../cdc/CdcRecordKeyAndBucketExtractorTest.java | 34 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index 1c9322b1d..769c41a19 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
@@ -54,10 +55,11 @@ public class CdcRecordUtils {
GenericRow genericRow = new GenericRow(dataFields.size());
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
- genericRow.setField(
- i,
- TypeUtils.castFromCdcValueString(
- record.fields().get(dataField.name()),
dataField.type()));
+ String fieldValue = record.fields().get(dataField.name());
+ if (!StringUtils.isEmpty(fieldValue)) {
+ genericRow.setField(
+ i, TypeUtils.castFromCdcValueString(fieldValue,
dataField.type()));
+ }
}
return genericRow;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
index bdf63ea77..4ec0215ed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -104,6 +104,40 @@ public class CdcRecordKeyAndBucketExtractorTest {
}
}
+ @Test
+ public void testNullPartition() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableSchema schema = createTableSchema();
+ RowDataKeyAndBucketExtractor expected = new
RowDataKeyAndBucketExtractor(schema);
+ CdcRecordKeyAndBucketExtractor actual = new
CdcRecordKeyAndBucketExtractor(schema);
+
+ long k1 = random.nextLong();
+ int v1 = random.nextInt();
+ String k2 = UUID.randomUUID().toString();
+ String v2 = UUID.randomUUID().toString();
+
+ GenericRowData rowData =
+ GenericRowData.of(
+ null, null, k1, v1, StringData.fromString(k2),
StringData.fromString(v2));
+ expected.setRecord(rowData);
+
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pt1", null);
+ fields.put("pt2", null);
+ fields.put("k1", String.valueOf(k1));
+ fields.put("v1", String.valueOf(v1));
+ fields.put("k2", k2);
+ fields.put("v2", v2);
+
+ actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ assertThat(actual.partition()).isEqualTo(expected.partition());
+ assertThat(actual.bucket()).isEqualTo(expected.bucket());
+
+ actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+ assertThat(actual.partition()).isEqualTo(expected.partition());
+ assertThat(actual.bucket()).isEqualTo(expected.bucket());
+ }
+
private TableSchema createTableSchema() throws Exception {
return SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString())),