This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 094e92510 Compaction Operator parallelism in paimon MUST be one (#3758)
094e92510 is described below
commit 094e92510af79a3f64a65ccf197d889c675f2f12
Author: HeavenZH <[email protected]>
AuthorDate: Wed Jul 24 16:32:15 2024 +0800
Compaction Operator parallelism in paimon MUST be one (#3758)
This closes #3758.
---
.../flink/source/BucketUnawareCompactSource.java | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index e6c301ded..7926fa60a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
@@ -74,6 +75,9 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<AppendOnlyCom
@Override
public void open(Configuration parameters) throws Exception {
compactionCoordinator = new
AppendOnlyTableCompactionCoordinator(table, streaming, filter);
+ Preconditions.checkArgument(
+ this.getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+ "Compaction Operator parallelism in paimon MUST be one.");
}
@Override
@@ -120,12 +124,15 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<AppendOnlyCom
String tableIdentifier) {
final StreamSource<AppendOnlyCompactionTask,
BucketUnawareCompactSource> sourceOperator =
new StreamSource<>(source);
- return new DataStreamSource<>(
- env,
- new CompactionTaskTypeInfo(),
- sourceOperator,
- false,
- COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier,
- streaming ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED);
+ return (DataStreamSource<AppendOnlyCompactionTask>)
+ new DataStreamSource<>(
+ env,
+ new CompactionTaskTypeInfo(),
+ sourceOperator,
+ false,
+ COMPACTION_COORDINATOR_NAME + " : " +
tableIdentifier,
+ streaming ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED)
+ .setParallelism(1)
+ .setMaxParallelism(1);
}
}