This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new ef998c2b6c [hotfix][flink] fix the partition error for
IncrementalClusterSplitSource (#6416)
ef998c2b6c is described below
commit ef998c2b6c1f5538016fbb36286f8db204b4a164
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Oct 16 21:15:28 2025 +0800
[hotfix][flink] fix the partition error for IncrementalClusterSplitSource
(#6416)
---
.../apache/paimon/flink/action/CompactAction.java | 18 +++++----
.../cluster/IncrementalClusterSplitSource.java | 25 ++++++++-----
...writeIncrementalClusterCommittableOperator.java | 5 +++
.../action/IncrementalClusterActionITCase.java | 43 +++++++++++++++++++++-
4 files changed, 72 insertions(+), 19 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index ce30ed5685..21e92512e3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -247,9 +247,10 @@ public class CompactAction extends TableActionBase {
Map<BinaryRow, CompactUnit> compactUnits =
incrementalClusterManager.prepareForCluster(fullCompaction);
if (compactUnits.isEmpty()) {
- LOGGER.info(
+ LOGGER.warn(
"No partition needs to be incrementally clustered. "
- + "Please set '--compact_strategy full' if you
need to forcibly trigger the cluster.");
+ + "Please set '--compact_strategy full' if you
need forcibly trigger the cluster."
+ + "Please set '--force_start_flink_job true' if
you need forcibly start a flink job.");
return false;
}
Map<BinaryRow, DataSplit[]> partitionSplits =
@@ -300,11 +301,13 @@ public class CompactAction extends TableActionBase {
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when
doWrite()
RowAppendTableSink sink = new RowAppendTableSink(table, null,
null, null);
- DataStream<Committable> clusterCommittable =
+ DataStream<Committable> written =
sink.doWrite(
- FlinkSinkBuilder.mapToInternalRow(sorted,
table.rowType()),
- commitUser,
- null)
+ FlinkSinkBuilder.mapToInternalRow(sorted,
table.rowType()),
+ commitUser,
+ null);
+ DataStream<Committable> clusterCommittable =
+ written.forward()
.transform(
"Rewrite cluster committable",
new CommittableTypeInfo(),
@@ -316,7 +319,8 @@ public class CompactAction extends TableActionBase {
Map.Entry::getKey,
unit ->
unit.getValue()
-
.outputLevel()))));
+
.outputLevel()))))
+ .setParallelism(written.getParallelism());
dataStreams.add(clusterCommittable);
dataStreams.add(sourcePair.getRight());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
index eae1d3b8e9..6d18181242 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -38,8 +38,9 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -86,21 +87,25 @@ public class IncrementalClusterSplitSource extends
AbstractNonCoordinatedSource<
Map<String, String> partitionSpec,
DataSplit[] splits,
@Nullable Integer parallelism) {
- DataStreamSource<Split> source =
+ DataStream<Split> source =
env.fromSource(
- new IncrementalClusterSplitSource(splits),
- WatermarkStrategy.noWatermarks(),
- String.format(
- "Incremental-cluster split generator: %s - %s",
- table.fullName(), partitionSpec),
- new JavaTypeInfo<>(Split.class));
+ new IncrementalClusterSplitSource(splits),
+ WatermarkStrategy.noWatermarks(),
+ String.format(
+ "Incremental-cluster split generator:
%s - %s",
+ table.fullName(), partitionSpec),
+ new JavaTypeInfo<>(Split.class))
+ .forceNonParallel();
+ PartitionTransformation<Split> partitioned =
+ new PartitionTransformation<>(
+ source.getTransformation(), new
RebalancePartitioner<>());
if (parallelism != null) {
- source.setParallelism(parallelism);
+ partitioned.setParallelism(parallelism);
}
return Pair.of(
- new DataStream<>(source.getExecutionEnvironment(),
source.getTransformation())
+ new DataStream<>(source.getExecutionEnvironment(), partitioned)
.transform(
String.format(
"Incremental-cluster reader: %s - %s",
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
index 6b0ef47f3f..01c2deba64 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
@@ -29,6 +29,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,6 +43,9 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Rewrite committable for new files written after clustered. */
public class RewriteIncrementalClusterCommittableOperator
extends BoundedOneInputOperator<Committable, Committable> {
+
+ protected static final Logger LOG =
+
LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class);
private static final long serialVersionUID = 1L;
private final FileStoreTable table;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 29f02adfd5..ff59915ccf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -325,6 +325,47 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThatCode(() ->
runAction(Collections.emptyList())).doesNotThrowAnyException();
}
+ @Test
+ public void testMultiParallelism() throws Exception {
+ FileStoreTable table = createTable(null, 2);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List<CommitMessage> messages = new ArrayList<>();
+
+ // first write
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1});
+ List<String> result1 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List<String> expected1 =
+ Lists.newArrayList(
+ "+I[0, 0]",
+ "+I[0, 1]",
+ "+I[0, 2]",
+ "+I[1, 0]",
+ "+I[1, 1]",
+ "+I[1, 2]",
+ "+I[2, 0]",
+ "+I[2, 1]",
+ "+I[2, 2]");
+ assertThat(result1).containsExactlyElementsOf(expected1);
+
+ runAction(Lists.newArrayList("--table_conf", "scan.parallelism=2"));
+ checkSnapshot(table);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isGreaterThanOrEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ }
+
protected FileStoreTable createTable(String partitionKeys, int
sinkParallelism)
throws Exception {
catalog.createDatabase(database, true);
@@ -405,8 +446,6 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
baseArgs.addAll(extra);
CompactAction action = createAction(CompactAction.class,
baseArgs.toArray(new String[0]));
- // action.withStreamExecutionEnvironment(env).build();
- // env.execute();
action.withStreamExecutionEnvironment(env);
action.run();
}