This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 094e92510 Compaction Operator parallelism in paimon MUST be one (#3758)
094e92510 is described below

commit 094e92510af79a3f64a65ccf197d889c675f2f12
Author: HeavenZH <[email protected]>
AuthorDate: Wed Jul 24 16:32:15 2024 +0800

    Compaction Operator parallelism in paimon MUST be one (#3758)
    
    This closes #3758.
---
 .../flink/source/BucketUnawareCompactSource.java    | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index e6c301ded..7926fa60a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
@@ -74,6 +75,9 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<AppendOnlyCom
     @Override
     public void open(Configuration parameters) throws Exception {
         compactionCoordinator = new 
AppendOnlyTableCompactionCoordinator(table, streaming, filter);
+        Preconditions.checkArgument(
+                this.getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+                "Compaction Operator parallelism in paimon MUST be one.");
     }
 
     @Override
@@ -120,12 +124,15 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<AppendOnlyCom
             String tableIdentifier) {
         final StreamSource<AppendOnlyCompactionTask, 
BucketUnawareCompactSource> sourceOperator =
                 new StreamSource<>(source);
-        return new DataStreamSource<>(
-                env,
-                new CompactionTaskTypeInfo(),
-                sourceOperator,
-                false,
-                COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier,
-                streaming ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED);
+        return (DataStreamSource<AppendOnlyCompactionTask>)
+                new DataStreamSource<>(
+                                env,
+                                new CompactionTaskTypeInfo(),
+                                sourceOperator,
+                                false,
+                                COMPACTION_COORDINATOR_NAME + " : " + 
tableIdentifier,
+                                streaming ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED)
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
     }
 }

Reply via email to