This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 731580b9b [flink][bug] Partition and bucket should be reset when a new 
record is set in CdcRecordKeyAndBucketExtractor (#895)
731580b9b is described below

commit 731580b9bbc8e8fe4d92f6b791a92983d6a3d7c1
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 13 14:55:11 2023 +0800

    [flink][bug] Partition and bucket should be reset when a new record is set 
in CdcRecordKeyAndBucketExtractor (#895)
---
 .../sink/cdc/CdcRecordKeyAndBucketExtractor.java   |   4 +
 .../cdc/CdcRecordKeyAndBucketExtractorTest.java    | 117 +++++++++++++++++++++
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  |  14 +--
 3 files changed, 129 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
index b1b89f853..cd19e04d8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
@@ -65,6 +65,10 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
     @Override
     public void setRecord(CdcRecord record) {
         this.record = record;
+
+        this.partition = null;
+        this.bucketKey = null;
+        this.bucket = null;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
new file mode 100644
index 000000000..bdf63ea77
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.RowDataKeyAndBucketExtractor;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.types.DataTypesTest.assertThat;
+
+/** Tests for {@link CdcRecordKeyAndBucketExtractor}. */
+public class CdcRecordKeyAndBucketExtractorTest {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new DataType[] {
+                        DataTypes.STRING(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.INT(),
+                        DataTypes.STRING(),
+                        DataTypes.STRING()
+                    },
+                    new String[] {"pt1", "pt2", "k1", "v1", "k2", "v2"});
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testExtract() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableSchema schema = createTableSchema();
+        RowDataKeyAndBucketExtractor expected = new 
RowDataKeyAndBucketExtractor(schema);
+        CdcRecordKeyAndBucketExtractor actual = new 
CdcRecordKeyAndBucketExtractor(schema);
+
+        int numTests = random.nextInt(1000) + 1;
+        for (int i = 0; i < numTests; i++) {
+            String pt1 = UUID.randomUUID().toString();
+            int pt2 = random.nextInt();
+            long k1 = random.nextLong();
+            int v1 = random.nextInt();
+            String k2 = UUID.randomUUID().toString();
+            String v2 = UUID.randomUUID().toString();
+
+            GenericRowData rowData =
+                    GenericRowData.of(
+                            StringData.fromString(pt1),
+                            pt2,
+                            k1,
+                            v1,
+                            StringData.fromString(k2),
+                            StringData.fromString(v2));
+            expected.setRecord(rowData);
+
+            Map<String, String> fields = new HashMap<>();
+            fields.put("pt1", pt1);
+            fields.put("pt2", String.valueOf(pt2));
+            fields.put("k1", String.valueOf(k1));
+            fields.put("v1", String.valueOf(v1));
+            fields.put("k2", k2);
+            fields.put("v2", v2);
+
+            actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+            assertThat(actual.partition()).isEqualTo(expected.partition());
+            assertThat(actual.bucket()).isEqualTo(expected.bucket());
+
+            actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+            assertThat(actual.partition()).isEqualTo(expected.partition());
+            assertThat(actual.bucket()).isEqualTo(expected.bucket());
+        }
+    }
+
+    private TableSchema createTableSchema() throws Exception {
+        return SchemaUtils.forceCommit(
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString())),
+                new Schema(
+                        ROW_TYPE.getFields(),
+                        Arrays.asList("pt1", "pt2"),
+                        Arrays.asList("pt1", "pt2", "k1", "k2"),
+                        new HashMap<>(),
+                        ""));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index eaac7448b..7be475e3c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -42,6 +42,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.TraceableFileIO;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Test;
@@ -72,11 +73,10 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
         int numEvents = random.nextInt(1500) + 1;
-        int numSchemaChanges = random.nextInt(10) + 1;
+        int numSchemaChanges = Math.min(numEvents / 2, random.nextInt(10) + 1);
         int numPartitions = random.nextInt(3) + 1;
         int numKeys = random.nextInt(150) + 1;
         int numBucket = random.nextInt(5) + 1;
-        boolean shuffleByPartitionEnable = random.nextBoolean();
         boolean enableFailure = random.nextBoolean();
 
         TestCdcEvent[] events = new TestCdcEvent[numEvents];
@@ -160,11 +160,14 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
                                 new String[] {"pt", "k", "v0"}),
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "k"),
-                        numBucket,
-                        shuffleByPartitionEnable);
+                        numBucket);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.getCheckpointConfig().setCheckpointInterval(100);
+        if (!enableFailure) {
+            env.setRestartStrategy(RestartStrategies.noRestart());
+        }
+
         TestCdcSourceFunction sourceFunction =
                 new TestCdcSourceFunction(
                         events, record -> 
Integer.valueOf(record.fields().get("k")));
@@ -218,8 +221,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
             RowType rowType,
             List<String> partitions,
             List<String> primaryKeys,
-            int numBucket,
-            boolean shuffleByPartitionEnable)
+            int numBucket)
             throws Exception {
         Options conf = new Options();
         conf.set(CoreOptions.BUCKET, numBucket);

Reply via email to