This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 44532b168 [flink][bug] Fix incorrect partitioning result from 
CdcBucketStreamPartitioner#selectChannel (#726)
44532b168 is described below

commit 44532b168ff0f6a6d4348b802ab43f5381e3a11e
Author: tsreaper <[email protected]>
AuthorDate: Tue Mar 28 19:53:55 2023 +0800

    [flink][bug] Fix incorrect partitioning result from 
CdcBucketStreamPartitioner#selectChannel (#726)
---
 .../flink/sink/cdc/CdcBucketStreamPartitioner.java | 15 ++++++---
 .../sink/cdc/SchemaAwareStoreWriteOperator.java    |  8 +++++
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  | 36 ++++++++++++++--------
 .../flink/sink/cdc/TestCdcSourceFunction.java      | 14 +++++++++
 4 files changed, 56 insertions(+), 17 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
index ef852be7f..3b4699c84 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
@@ -44,9 +45,13 @@ import java.util.stream.IntStream;
 /**
  * A {@link StreamPartitioner} which partitions {@link CdcRecord}s according 
to the hash value of
  * bucket keys (or primary keys if bucket keys are not specified).
+ *
+ * <p>TODO: merge this class with {@link 
org.apache.paimon.flink.sink.BucketStreamPartitioner} and
+ * refactor {@link BucketComputer} if possible.
  */
 public class CdcBucketStreamPartitioner extends StreamPartitioner<CdcRecord> {
 
+    private final int numBuckets;
     private final List<String> bucketKeys;
     private final DataType[] bucketTypes;
     private final List<String> partitionKeys;
@@ -60,11 +65,12 @@ public class CdcBucketStreamPartitioner extends 
StreamPartitioner<CdcRecord> {
     public CdcBucketStreamPartitioner(TableSchema schema, boolean 
shuffleByPartitionEnable) {
         List<String> bucketKeys = schema.originalBucketKeys();
         if (bucketKeys.isEmpty()) {
-            bucketKeys = schema.primaryKeys();
+            bucketKeys = schema.trimmedPrimaryKeys();
         }
         Preconditions.checkArgument(
                 bucketKeys.size() > 0, "Either bucket keys or primary keys 
must be defined");
 
+        this.numBuckets = new CoreOptions(schema.options()).bucket();
         this.bucketKeys = bucketKeys;
         this.bucketTypes = getTypes(this.bucketKeys, schema);
         this.partitionKeys = schema.partitionKeys();
@@ -100,15 +106,14 @@ public class CdcBucketStreamPartitioner extends 
StreamPartitioner<CdcRecord> {
         CdcRecord record = 
streamRecordSerializationDelegate.getInstance().getValue();
         BinaryRow bucketKeyRow =
                 toBinaryRow(record.fields(), bucketKeys, bucketTypes, 
bucketProjection);
+        int bucket = BucketComputer.bucket(bucketKeyRow.hashCode(), 
numBuckets);
         if (shuffleByPartitionEnable) {
             BinaryRow partitionKeyRow =
                     toBinaryRow(
                             record.fields(), partitionKeys, partitionTypes, 
partitionProjection);
-            return BucketComputer.bucket(
-                    Objects.hash(bucketKeyRow.hashCode(), 
partitionKeyRow.hashCode()),
-                    numberOfChannels);
+            return Math.abs(Objects.hash(bucket, partitionKeyRow.hashCode())) 
% numberOfChannels;
         } else {
-            return BucketComputer.bucket(bucketKeyRow.hashCode(), 
numberOfChannels);
+            return bucket % numberOfChannels;
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
index 261a0f9dc..6988f2782 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -31,6 +31,8 @@ import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.TypeUtils;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -61,6 +63,12 @@ public class SchemaAwareStoreWriteOperator extends 
AbstractStoreWriteOperator<Cd
         retrySleepMillis = 
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
     }
 
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        table = table.copyWithLatestSchema();
+        super.initializeState(context);
+    }
+
     @Override
     protected SinkRecord processRecord(CdcRecord record) throws Exception {
         Map<String, Object> convertedFields = tryConvert(record.fields());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index d912119bc..b5645404f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -50,6 +51,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -67,13 +69,16 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
     @TempDir java.nio.file.Path tempDir;
 
     @Test
-    @Timeout(60)
+    @Timeout(120)
     public void testRandomCdcEvents() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
-        int numEvents = random.nextInt(20000) + 1;
-        int numSchemaChanges = random.nextInt(20) + 1;
-        int numKeys = random.nextInt(2000) + 1;
+        int numEvents = random.nextInt(1500) + 1;
+        int numSchemaChanges = random.nextInt(10) + 1;
+        int numPartitions = random.nextInt(3) + 1;
+        int numKeys = random.nextInt(150) + 1;
+        int numBucket = random.nextInt(5) + 1;
+        boolean shuffleByPartitionEnable = random.nextBoolean();
         boolean enableFailure = random.nextBoolean();
 
         TestCdcEvent[] events = new TestCdcEvent[numEvents];
@@ -113,6 +118,8 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
                 Map<String, String> fields = new HashMap<>();
                 int key = random.nextInt(numKeys);
                 fields.put("k", String.valueOf(key));
+                int pt = key % numPartitions;
+                fields.put("pt", String.valueOf(pt));
                 for (int j = 0; j < fieldNames.size(); j++) {
                     String fieldName = fieldNames.get(j);
                     if (isBigInt.get(j)) {
@@ -151,13 +158,15 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
                         tablePath,
                         fileIO,
                         RowType.of(
-                                new DataType[] {DataTypes.INT(), 
DataTypes.INT()},
-                                new String[] {"k", "v0"}),
-                        Collections.emptyList(),
-                        Collections.singletonList("k"));
+                                new DataType[] {DataTypes.INT(), 
DataTypes.INT(), DataTypes.INT()},
+                                new String[] {"pt", "k", "v0"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "k"),
+                        numBucket,
+                        shuffleByPartitionEnable);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getCheckpointConfig().setCheckpointInterval(1000);
+        env.getCheckpointConfig().setCheckpointInterval(100);
         TestCdcSourceFunction sourceFunction =
                 new TestCdcSourceFunction(
                         events, record -> 
Integer.valueOf(record.fields().get("k")));
@@ -171,7 +180,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
                 .build();
 
         // enable failure when running jobs if needed
-        FailingFileIO.reset(failingName, 100, 10000);
+        FailingFileIO.reset(failingName, 10, 10000);
 
         env.execute();
 
@@ -210,12 +219,15 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
             FileIO fileIO,
             RowType rowType,
             List<String> partitions,
-            List<String> primaryKeys)
+            List<String> primaryKeys,
+            int numBucket,
+            boolean shuffleByPartitionEnable)
             throws Exception {
         Options conf = new Options();
-        conf.set(CoreOptions.BUCKET, 3);
+        conf.set(CoreOptions.BUCKET, numBucket);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+        conf.set(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION, 
shuffleByPartitionEnable);
 
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
index caa7e46f8..fc952af49 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -31,6 +31,8 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 
 import java.util.Arrays;
 import java.util.LinkedList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -46,6 +48,8 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
     private final SerializableFunction<CdcRecord, Integer> getKeyHash;
 
     private volatile boolean isRunning = true;
+    private transient int numRecordsPerCheckpoint;
+    private transient AtomicInteger recordsThisCheckpoint;
     private transient ListState<Integer> remainingEventsCount;
 
     public TestCdcSourceFunction(
@@ -56,6 +60,9 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        numRecordsPerCheckpoint = events.size() / 
ThreadLocalRandom.current().nextInt(10, 20);
+        recordsThisCheckpoint = new AtomicInteger(0);
+
         remainingEventsCount =
                 context.getOperatorStateStore()
                         .getListState(new ListStateDescriptor<>("count", 
Integer.class));
@@ -73,6 +80,7 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
 
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        recordsThisCheckpoint.set(0);
         remainingEventsCount.clear();
         remainingEventsCount.add(events.size());
     }
@@ -80,6 +88,11 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
     @Override
     public void run(SourceContext<TestCdcEvent> ctx) throws Exception {
         while (isRunning && !events.isEmpty()) {
+            if (recordsThisCheckpoint.get() >= numRecordsPerCheckpoint) {
+                Thread.sleep(10);
+                continue;
+            }
+
             synchronized (ctx.getCheckpointLock()) {
                 TestCdcEvent event = events.poll();
                 if (event.records() != null) {
@@ -99,6 +112,7 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
                     }
                 }
                 ctx.collect(event);
+                recordsThisCheckpoint.incrementAndGet();
             }
         }
     }

Reply via email to