This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.3 by this push:
     new ef998c2b6c [hotfix][flink] fix the partition error for 
IncrementalClusterSplitSource (#6416)
ef998c2b6c is described below

commit ef998c2b6c1f5538016fbb36286f8db204b4a164
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Oct 16 21:15:28 2025 +0800

    [hotfix][flink] fix the partition error for IncrementalClusterSplitSource 
(#6416)
---
 .../apache/paimon/flink/action/CompactAction.java  | 18 +++++----
 .../cluster/IncrementalClusterSplitSource.java     | 25 ++++++++-----
 ...writeIncrementalClusterCommittableOperator.java |  5 +++
 .../action/IncrementalClusterActionITCase.java     | 43 +++++++++++++++++++++-
 4 files changed, 72 insertions(+), 19 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index ce30ed5685..21e92512e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -247,9 +247,10 @@ public class CompactAction extends TableActionBase {
         Map<BinaryRow, CompactUnit> compactUnits =
                 incrementalClusterManager.prepareForCluster(fullCompaction);
         if (compactUnits.isEmpty()) {
-            LOGGER.info(
+            LOGGER.warn(
                     "No partition needs to be incrementally clustered. "
-                            + "Please set '--compact_strategy full' if you 
need to forcibly trigger the cluster.");
+                            + "Please set '--compact_strategy full' if you 
need forcibly trigger the cluster."
+                            + "Please set '--force_start_flink_job true' if 
you need forcibly start a flink job.");
             return false;
         }
         Map<BinaryRow, DataSplit[]> partitionSplits =
@@ -300,11 +301,13 @@ public class CompactAction extends TableActionBase {
             // 2.3 write and then reorganize the committable
             // set parallelism to null, and it'll forward parallelism when 
doWrite()
             RowAppendTableSink sink = new RowAppendTableSink(table, null, 
null, null);
-            DataStream<Committable> clusterCommittable =
+            DataStream<Committable> written =
                     sink.doWrite(
-                                    FlinkSinkBuilder.mapToInternalRow(sorted, 
table.rowType()),
-                                    commitUser,
-                                    null)
+                            FlinkSinkBuilder.mapToInternalRow(sorted, 
table.rowType()),
+                            commitUser,
+                            null);
+            DataStream<Committable> clusterCommittable =
+                    written.forward()
                             .transform(
                                     "Rewrite cluster committable",
                                     new CommittableTypeInfo(),
@@ -316,7 +319,8 @@ public class CompactAction extends TableActionBase {
                                                                     
Map.Entry::getKey,
                                                                     unit ->
                                                                             
unit.getValue()
-                                                                               
     .outputLevel()))));
+                                                                               
     .outputLevel()))))
+                            .setParallelism(written.getParallelism());
             dataStreams.add(clusterCommittable);
             dataStreams.add(sourcePair.getRight());
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
index eae1d3b8e9..6d18181242 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -38,8 +38,9 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
@@ -86,21 +87,25 @@ public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<
             Map<String, String> partitionSpec,
             DataSplit[] splits,
             @Nullable Integer parallelism) {
-        DataStreamSource<Split> source =
+        DataStream<Split> source =
                 env.fromSource(
-                        new IncrementalClusterSplitSource(splits),
-                        WatermarkStrategy.noWatermarks(),
-                        String.format(
-                                "Incremental-cluster split generator: %s - %s",
-                                table.fullName(), partitionSpec),
-                        new JavaTypeInfo<>(Split.class));
+                                new IncrementalClusterSplitSource(splits),
+                                WatermarkStrategy.noWatermarks(),
+                                String.format(
+                                        "Incremental-cluster split generator: 
%s - %s",
+                                        table.fullName(), partitionSpec),
+                                new JavaTypeInfo<>(Split.class))
+                        .forceNonParallel();
 
+        PartitionTransformation<Split> partitioned =
+                new PartitionTransformation<>(
+                        source.getTransformation(), new 
RebalancePartitioner<>());
         if (parallelism != null) {
-            source.setParallelism(parallelism);
+            partitioned.setParallelism(parallelism);
         }
 
         return Pair.of(
-                new DataStream<>(source.getExecutionEnvironment(), 
source.getTransformation())
+                new DataStream<>(source.getExecutionEnvironment(), partitioned)
                         .transform(
                                 String.format(
                                         "Incremental-cluster reader: %s - %s",
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
index 6b0ef47f3f..01c2deba64 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
@@ -29,6 +29,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,6 +43,9 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Rewrite committable for new files written after clustered. */
 public class RewriteIncrementalClusterCommittableOperator
         extends BoundedOneInputOperator<Committable, Committable> {
+
+    protected static final Logger LOG =
+            
LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class);
     private static final long serialVersionUID = 1L;
 
     private final FileStoreTable table;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 29f02adfd5..ff59915ccf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -325,6 +325,47 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThatCode(() -> 
runAction(Collections.emptyList())).doesNotThrowAnyException();
     }
 
+    @Test
+    public void testMultiParallelism() throws Exception {
+        FileStoreTable table = createTable(null, 2);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 3; j++) {
+                messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected1 =
+                Lists.newArrayList(
+                        "+I[0, 0]",
+                        "+I[0, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 0]",
+                        "+I[1, 1]",
+                        "+I[1, 2]",
+                        "+I[2, 0]",
+                        "+I[2, 1]",
+                        "+I[2, 2]");
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        runAction(Lists.newArrayList("--table_conf", "scan.parallelism=2"));
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isGreaterThanOrEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+    }
+
     protected FileStoreTable createTable(String partitionKeys, int 
sinkParallelism)
             throws Exception {
         catalog.createDatabase(database, true);
@@ -405,8 +446,6 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         baseArgs.addAll(extra);
 
         CompactAction action = createAction(CompactAction.class, 
baseArgs.toArray(new String[0]));
-        //        action.withStreamExecutionEnvironment(env).build();
-        //        env.execute();
         action.withStreamExecutionEnvironment(env);
         action.run();
     }

Reply via email to