This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 44532b168 [flink][bug] Fix incorrect partitioning result from
CdcBucketStreamPartitioner#selectChannel (#726)
44532b168 is described below
commit 44532b168ff0f6a6d4348b802ab43f5381e3a11e
Author: tsreaper <[email protected]>
AuthorDate: Tue Mar 28 19:53:55 2023 +0800
[flink][bug] Fix incorrect partitioning result from
CdcBucketStreamPartitioner#selectChannel (#726)
---
.../flink/sink/cdc/CdcBucketStreamPartitioner.java | 15 ++++++---
.../sink/cdc/SchemaAwareStoreWriteOperator.java | 8 +++++
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 36 ++++++++++++++--------
.../flink/sink/cdc/TestCdcSourceFunction.java | 14 +++++++++
4 files changed, 56 insertions(+), 17 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
index ef852be7f..3b4699c84 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
@@ -44,9 +45,13 @@ import java.util.stream.IntStream;
/**
* A {@link StreamPartitioner} which partitions {@link CdcRecord}s according
to the hash value of
* bucket keys (or primary keys if bucket keys are not specified).
+ *
+ * <p>TODO: merge this class with {@link
org.apache.paimon.flink.sink.BucketStreamPartitioner} and
+ * refactor {@link BucketComputer} if possible.
*/
public class CdcBucketStreamPartitioner extends StreamPartitioner<CdcRecord> {
+ private final int numBuckets;
private final List<String> bucketKeys;
private final DataType[] bucketTypes;
private final List<String> partitionKeys;
@@ -60,11 +65,12 @@ public class CdcBucketStreamPartitioner extends
StreamPartitioner<CdcRecord> {
public CdcBucketStreamPartitioner(TableSchema schema, boolean
shuffleByPartitionEnable) {
List<String> bucketKeys = schema.originalBucketKeys();
if (bucketKeys.isEmpty()) {
- bucketKeys = schema.primaryKeys();
+ bucketKeys = schema.trimmedPrimaryKeys();
}
Preconditions.checkArgument(
bucketKeys.size() > 0, "Either bucket keys or primary keys
must be defined");
+ this.numBuckets = new CoreOptions(schema.options()).bucket();
this.bucketKeys = bucketKeys;
this.bucketTypes = getTypes(this.bucketKeys, schema);
this.partitionKeys = schema.partitionKeys();
@@ -100,15 +106,14 @@ public class CdcBucketStreamPartitioner extends
StreamPartitioner<CdcRecord> {
CdcRecord record =
streamRecordSerializationDelegate.getInstance().getValue();
BinaryRow bucketKeyRow =
toBinaryRow(record.fields(), bucketKeys, bucketTypes,
bucketProjection);
+ int bucket = BucketComputer.bucket(bucketKeyRow.hashCode(),
numBuckets);
if (shuffleByPartitionEnable) {
BinaryRow partitionKeyRow =
toBinaryRow(
record.fields(), partitionKeys, partitionTypes,
partitionProjection);
- return BucketComputer.bucket(
- Objects.hash(bucketKeyRow.hashCode(),
partitionKeyRow.hashCode()),
- numberOfChannels);
+ return Math.abs(Objects.hash(bucket, partitionKeyRow.hashCode()))
% numberOfChannels;
} else {
- return BucketComputer.bucket(bucketKeyRow.hashCode(),
numberOfChannels);
+ return bucket % numberOfChannels;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
index 261a0f9dc..6988f2782 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -31,6 +31,8 @@ import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.TypeUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -61,6 +63,12 @@ public class SchemaAwareStoreWriteOperator extends
AbstractStoreWriteOperator<Cd
retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ table = table.copyWithLatestSchema();
+ super.initializeState(context);
+ }
+
@Override
protected SinkRecord processRecord(CdcRecord record) throws Exception {
Map<String, Object> convertedFields = tryConvert(record.fields());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index d912119bc..b5645404f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -50,6 +51,7 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -67,13 +69,16 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
@TempDir java.nio.file.Path tempDir;
@Test
- @Timeout(60)
+ @Timeout(120)
public void testRandomCdcEvents() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- int numEvents = random.nextInt(20000) + 1;
- int numSchemaChanges = random.nextInt(20) + 1;
- int numKeys = random.nextInt(2000) + 1;
+ int numEvents = random.nextInt(1500) + 1;
+ int numSchemaChanges = random.nextInt(10) + 1;
+ int numPartitions = random.nextInt(3) + 1;
+ int numKeys = random.nextInt(150) + 1;
+ int numBucket = random.nextInt(5) + 1;
+ boolean shuffleByPartitionEnable = random.nextBoolean();
boolean enableFailure = random.nextBoolean();
TestCdcEvent[] events = new TestCdcEvent[numEvents];
@@ -113,6 +118,8 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
Map<String, String> fields = new HashMap<>();
int key = random.nextInt(numKeys);
fields.put("k", String.valueOf(key));
+ int pt = key % numPartitions;
+ fields.put("pt", String.valueOf(pt));
for (int j = 0; j < fieldNames.size(); j++) {
String fieldName = fieldNames.get(j);
if (isBigInt.get(j)) {
@@ -151,13 +158,15 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
tablePath,
fileIO,
RowType.of(
- new DataType[] {DataTypes.INT(),
DataTypes.INT()},
- new String[] {"k", "v0"}),
- Collections.emptyList(),
- Collections.singletonList("k"));
+ new DataType[] {DataTypes.INT(),
DataTypes.INT(), DataTypes.INT()},
+ new String[] {"pt", "k", "v0"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ numBucket,
+ shuffleByPartitionEnable);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getCheckpointConfig().setCheckpointInterval(1000);
+ env.getCheckpointConfig().setCheckpointInterval(100);
TestCdcSourceFunction sourceFunction =
new TestCdcSourceFunction(
events, record ->
Integer.valueOf(record.fields().get("k")));
@@ -171,7 +180,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
.build();
// enable failure when running jobs if needed
- FailingFileIO.reset(failingName, 100, 10000);
+ FailingFileIO.reset(failingName, 10, 10000);
env.execute();
@@ -210,12 +219,15 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
FileIO fileIO,
RowType rowType,
List<String> partitions,
- List<String> primaryKeys)
+ List<String> primaryKeys,
+ int numBucket,
+ boolean shuffleByPartitionEnable)
throws Exception {
Options conf = new Options();
- conf.set(CoreOptions.BUCKET, 3);
+ conf.set(CoreOptions.BUCKET, numBucket);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ conf.set(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION,
shuffleByPartitionEnable);
TableSchema tableSchema =
SchemaUtils.forceCommit(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
index caa7e46f8..fc952af49 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -31,6 +31,8 @@ import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import java.util.Arrays;
import java.util.LinkedList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -46,6 +48,8 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
private final SerializableFunction<CdcRecord, Integer> getKeyHash;
private volatile boolean isRunning = true;
+ private transient int numRecordsPerCheckpoint;
+ private transient AtomicInteger recordsThisCheckpoint;
private transient ListState<Integer> remainingEventsCount;
public TestCdcSourceFunction(
@@ -56,6 +60,9 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
+ numRecordsPerCheckpoint = events.size() /
ThreadLocalRandom.current().nextInt(10, 20);
+ recordsThisCheckpoint = new AtomicInteger(0);
+
remainingEventsCount =
context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count",
Integer.class));
@@ -73,6 +80,7 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ recordsThisCheckpoint.set(0);
remainingEventsCount.clear();
remainingEventsCount.add(events.size());
}
@@ -80,6 +88,11 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
@Override
public void run(SourceContext<TestCdcEvent> ctx) throws Exception {
while (isRunning && !events.isEmpty()) {
+ if (recordsThisCheckpoint.get() >= numRecordsPerCheckpoint) {
+ Thread.sleep(10);
+ continue;
+ }
+
synchronized (ctx.getCheckpointLock()) {
TestCdcEvent event = events.poll();
if (event.records() != null) {
@@ -99,6 +112,7 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
}
}
ctx.collect(event);
+ recordsThisCheckpoint.incrementAndGet();
}
}
}