This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 48a5165cd [flink] Add Shuffle by partition Option to
ContinuousFileSplitEnumerator (#3877)
48a5165cd is described below
commit 48a5165cd85bf3f7c1b472e5394948ec376d1d28
Author: 吴祥平 <[email protected]>
AuthorDate: Tue Aug 6 23:53:24 2024 +0800
[flink] Add Shuffle by partition Option to ContinuousFileSplitEnumerator
(#3877)
---
.../generated/flink_connector_configuration.html | 6 ++++++
.../org/apache/paimon/flink/FlinkConnectorOptions.java | 7 +++++++
.../paimon/flink/source/ContinuousFileSplitEnumerator.java | 14 ++++++++++++--
.../paimon/flink/source/ContinuousFileStoreSource.java | 9 ++++++---
.../org/apache/paimon/flink/source/FlinkSourceBuilder.java | 3 ++-
.../source/align/AlignedContinuousFileSplitEnumerator.java | 6 ++++--
.../source/align/AlignedContinuousFileStoreSource.java | 3 ++-
.../paimon/flink/source/operator/MonitorFunction.java | 11 ++++++++---
.../flink/source/ContinuousFileSplitEnumeratorTest.java | 2 +-
.../align/AlignedContinuousFileSplitEnumeratorTest.java | 10 +++++++++-
10 files changed, 57 insertions(+), 14 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f5c98df17..550254f0d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -254,6 +254,12 @@ under the License.
<td>Duration</td>
<td>If the new snapshot has not been generated when the checkpoint
starts to trigger, the enumerator will block the checkpoint and wait for the
new snapshot. Set the maximum waiting time to avoid infinite waiting, if
timeout, the checkpoint will fail. Note that it should be set smaller than the
checkpoint timeout.</td>
</tr>
+ <tr>
+ <td><h5>streaming-read.shuffle-by-partition</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether shuffle by partition and bucket when streaming
read.</td>
+ </tr>
<tr>
<td><h5>unaware-bucket.compaction.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 57f337739..cac13c8de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -217,6 +217,13 @@ public class FlinkConnectorOptions {
+ " Note: This is dangerous and is likely
to cause data errors if downstream"
+ " is used to calculate aggregation and
the input is not complete changelog.");
+ public static final ConfigOption<Boolean>
STREAMING_READ_SHUFFLE_BY_PARTITION =
+ key("streaming-read.shuffle-by-partition")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether shuffle by partition and bucket when
streaming read.");
+
/**
* Weight of writer buffer in managed memory, Flink will compute the
memory size for writer
* according to the weight, the actual memory used depends on the running
environment.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 4ec3cd5f3..b01542a0e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -23,6 +23,7 @@ import
org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
@@ -75,6 +76,8 @@ public class ContinuousFileSplitEnumerator
private final int splitMaxNum;
+ private final boolean shuffleByPartition;
+
@Nullable protected Long nextSnapshotId;
protected boolean finished = false;
@@ -88,7 +91,8 @@ public class ContinuousFileSplitEnumerator
long discoveryInterval,
StreamTableScan scan,
BucketMode bucketMode,
- int splitMaxPerTask) {
+ int splitMaxPerTask,
+ boolean shuffleByPartition) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
@@ -98,6 +102,7 @@ public class ContinuousFileSplitEnumerator
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+ this.shuffleByPartition = shuffleByPartition;
addSplits(remainSplits);
this.consumerProgressCalculator =
@@ -275,7 +280,12 @@ public class ContinuousFileSplitEnumerator
}
protected int assignSuggestedTask(FileStoreSourceSplit split) {
- return ((DataSplit) split.split()).bucket() %
context.currentParallelism();
+ DataSplit dataSplit = ((DataSplit) split.split());
+ if (shuffleByPartition) {
+ return ChannelComputer.select(
+ dataSplit.partition(), dataSplit.bucket(),
context.currentParallelism());
+ }
+ return ChannelComputer.select(dataSplit.bucket(),
context.currentParallelism());
}
protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index ca61ab4eb..d7df4fede 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,7 +19,9 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamDataTableScan;
@@ -99,14 +101,15 @@ public class ContinuousFileStoreSource extends FlinkSource
{
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
- CoreOptions coreOptions = CoreOptions.fromMap(options);
+ Options options = Options.fromMap(this.options);
return new ContinuousFileSplitEnumerator(
context,
splits,
nextSnapshotId,
- coreOptions.continuousDiscoveryInterval().toMillis(),
+
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
bucketMode,
- coreOptions.scanSplitMaxPerTask());
+ options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
+
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 4fe6b3ec1..c81b6f39b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -305,7 +305,8 @@ public class FlinkSourceBuilder {
produceTypeInfo(),
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
- watermarkStrategy == null);
+ watermarkStrategy == null,
+
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index e3e0aabb6..a038a2e02 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -94,7 +94,8 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
StreamTableScan scan,
BucketMode bucketMode,
long alignTimeout,
- int splitPerTaskMax) {
+ int splitPerTaskMax,
+ boolean shuffleByPartition) {
super(
context,
remainSplits,
@@ -102,7 +103,8 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
discoveryInterval,
scan,
bucketMode,
- splitPerTaskMax);
+ splitPerTaskMax,
+ shuffleByPartition);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index c00b7ea51..846d3774c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -91,6 +91,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
scan,
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
- options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK));
+ options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
+
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 213488d4b..2f8ce1ea5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -229,15 +229,20 @@ public class MonitorFunction extends
RichSourceFunction<Split>
TypeInformation<RowData> typeInfo,
ReadBuilder readBuilder,
long monitorInterval,
- boolean emitSnapshotWatermark) {
+ boolean emitSnapshotWatermark,
+ boolean shuffleByPartition) {
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval,
emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
- (key, numPartitions) ->
- ChannelComputer.select(key.f0, key.f1,
numPartitions),
+ (key, numPartitions) -> {
+ if (shuffleByPartition) {
+ return ChannelComputer.select(key.f0, key.f1,
numPartitions);
+ }
+ return ChannelComputer.select(key.f1,
numPartitions);
+ },
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(),
dataSplit.bucket());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index b0077ec1b..3a10f9c8d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -878,7 +878,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan,
bucketMode, 10);
+ context, initialSplits, null, discoveryInterval, scan,
bucketMode, 10, false);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index c164e00da..a5edc2804 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -245,7 +245,15 @@ public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumerato
public AlignedContinuousFileSplitEnumerator build() {
return new AlignedContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan,
bucketMode, timeout, 10);
+ context,
+ initialSplits,
+ null,
+ discoveryInterval,
+ scan,
+ bucketMode,
+ timeout,
+ 10,
+ false);
}
}
}