This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new df9a90b67a Revert "[flink] support infer parallelism for incremental
clustering (#6624)" (#6637)
df9a90b67a is described below
commit df9a90b67a76d4a244a2829ffd152665f2eb0188
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Nov 20 13:12:51 2025 +0800
Revert "[flink] support infer parallelism for incremental clustering
(#6624)" (#6637)
---
.../shortcodes/generated/core_configuration.html | 6 --
.../main/java/org/apache/paimon/CoreOptions.java | 10 ---
.../apache/paimon/flink/action/CompactAction.java | 35 ++-------
.../action/IncrementalClusterActionITCase.java | 88 +++++-----------------
tools/maven/suppressions.xml | 2 -
5 files changed, 23 insertions(+), 118 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d1c46f2ff6..c3251aa979 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,12 +182,6 @@ under the License.
<td>Boolean</td>
<td>Whether enable incremental clustering.</td>
</tr>
- <tr>
- <td><h5>clustering.per-subtask.data-size</h5></td>
- <td style="word-wrap: break-word;">1 gb</td>
- <td>MemorySize</td>
- <td>The data size processed by single parallelism.</td>
- </tr>
<tr>
<td><h5>clustering.strategy</h5></td>
<td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e702aceace..ccb490ec41 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1967,12 +1967,6 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether enable incremental clustering.");
- public static final ConfigOption<MemorySize> CLUSTERING_PER_TASK_DATA_SIZE
=
- key("clustering.per-subtask.data-size")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(1024))
- .withDescription("The data size processed by single
parallelism.");
-
public static final ConfigOption<Integer>
CLUSTERING_HISTORY_PARTITION_LIMIT =
key("clustering.history-partition.limit")
.intType()
@@ -3155,10 +3149,6 @@ public class CoreOptions implements Serializable {
return options.get(CLUSTERING_INCREMENTAL);
}
- public MemorySize clusteringPerTaskDataSize() {
- return options.get(CLUSTERING_PER_TASK_DATA_SIZE);
- }
-
public Duration clusteringHistoryPartitionIdleTime() {
return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index e77574c7e1..694b67d705 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -41,7 +41,6 @@ import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -80,7 +79,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -245,9 +243,6 @@ public class CompactAction extends TableActionBase {
table.partitionKeys().toArray(new String[0]),
table.coreOptions().legacyPartitionName());
- long perSubtaskDataSize =
table.coreOptions().clusteringPerTaskDataSize().getBytes();
- LOGGER.info("{} is {} bytes.", CLUSTERING_PER_TASK_DATA_SIZE.key(),
perSubtaskDataSize);
-
// 1. pick cluster files for each partition
Map<BinaryRow, CompactUnit> compactUnits =
incrementalClusterManager.prepareForCluster(fullCompaction);
@@ -281,32 +276,14 @@ public class CompactAction extends TableActionBase {
partitionComputer.generatePartValues(partition);
// 2.1 generate source for current partition
- long partitionFileSize = 0L;
- int partitionFileCount = 0;
- for (DataSplit split : splits) {
- for (DataFileMeta fileMeta : split.dataFiles()) {
- partitionFileSize += fileMeta.fileSize();
- partitionFileCount++;
- }
- }
- int inferParallelism =
- Math.min(
- partitionFileCount,
- Math.max(1, (int) (partitionFileSize /
perSubtaskDataSize)));
- LOGGER.info(
- "For partition {}, the total data size is {} bytes, total
file count is {}, infer parallelism is {}.",
- partitionSpec,
- partitionFileSize,
- partitionFileCount,
- inferParallelism);
-
- Integer scanParallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
- if (scanParallelism == null) {
- scanParallelism = inferParallelism;
- }
Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
IncrementalClusterSplitSource.buildSource(
- env, table, partitionSpec, splits,
dvCommitMessage, scanParallelism);
+ env,
+ table,
+ partitionSpec,
+ splits,
+ dvCommitMessage,
+
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
// 2.2 cluster in partition
Integer sinkParallelism =
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 1abc5e43bb..bf7087e77e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -67,7 +67,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterUnpartitionedTable() throws Exception {
- FileStoreTable table = createTable(null);
+ FileStoreTable table = createTable(null, 1);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -204,7 +204,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterPartitionedTable() throws Exception {
- FileStoreTable table = createTable("pt");
+ FileStoreTable table = createTable("pt", 1);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -336,7 +336,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterSpecifyPartition() throws Exception {
- FileStoreTable table = createTable("pt");
+ FileStoreTable table = createTable("pt", 1);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -378,9 +378,9 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterHistoryPartition() throws Exception {
- Map<String, String> options = commonOptions();
+ Map<String, String> options = new HashMap<>();
options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(),
"3s");
- FileStoreTable table = createTable("pt", options);
+ FileStoreTable table = createTable("pt", 1, options);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -530,15 +530,13 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterOnEmptyData() throws Exception {
- createTable("pt");
+ createTable("pt", 1);
assertThatCode(() ->
runAction(Collections.emptyList())).doesNotThrowAnyException();
}
@Test
public void testMultiParallelism() throws Exception {
- Map<String, String> options = commonOptions();
- options.put("scan.parallelism", "2");
- FileStoreTable table = createTable(null, options);
+ FileStoreTable table = createTable(null, 2);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -579,9 +577,9 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterWithDeletionVector() throws Exception {
- Map<String, String> dynamicOptions = commonOptions();
+ Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
- FileStoreTable table = createTable(null, dynamicOptions);
+ FileStoreTable table = createTable(null, 1, dynamicOptions);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -683,43 +681,16 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
}
- @Test
- public void testClusterWithInferParallelism() throws Exception {
- Map<String, String> options = commonOptions();
- options.remove("scan.parallelism");
- options.remove("sink.parallelism");
- options.put(CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE.key(), "50kb");
- FileStoreTable table = createTable(null, options);
-
- BinaryString randomStr = BinaryString.fromString(randomString(150));
- List<CommitMessage> messages = new ArrayList<>();
-
- // first write, generate 100 files, total size 173kb
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
- }
- }
- commit(messages);
- ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1});
-
- // first cluster
- runAction(Collections.emptyList());
- checkSnapshot(table);
- List<Split> splits = readBuilder.newScan().plan().splits();
- assertThat(splits.size()).isEqualTo(1);
- assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(3);
- assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
- }
-
- protected FileStoreTable createTable(String partitionKeys) throws
Exception {
- return createTable(partitionKeys, commonOptions());
+ protected FileStoreTable createTable(String partitionKeys, int
sinkParallelism)
+ throws Exception {
+ return createTable(partitionKeys, sinkParallelism,
Collections.emptyMap());
}
- protected FileStoreTable createTable(String partitionKeys, Map<String,
String> options)
+ protected FileStoreTable createTable(
+ String partitionKeys, int sinkParallelism, Map<String, String>
options)
throws Exception {
catalog.createDatabase(database, true);
- catalog.createTable(identifier(), buildSchema(partitionKeys, options),
true);
+ catalog.createTable(identifier(), schema(partitionKeys,
sinkParallelism, options), true);
return (FileStoreTable) catalog.getTable(identifier());
}
@@ -747,33 +718,8 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
commit.close();
}
- private static Schema buildSchema(String partitionKeys, Map<String,
String> options) {
- Schema.Builder schemaBuilder = Schema.newBuilder();
- schemaBuilder.column("a", DataTypes.INT());
- schemaBuilder.column("b", DataTypes.INT());
- schemaBuilder.column("c", DataTypes.STRING());
- schemaBuilder.column("pt", DataTypes.INT());
- for (String key : options.keySet()) {
- schemaBuilder.option(key, options.get(key));
- }
- if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
- schemaBuilder.partitionKeys(partitionKeys);
- }
- return schemaBuilder.build();
- }
-
- private static Map<String, String> commonOptions() {
- Map<String, String> options = new HashMap<>();
- options.put("bucket", "-1");
- options.put("num-levels", "6");
- options.put("num-sorted-run.compaction-trigger", "2");
- options.put("scan.plan-sort-partition", "true");
- options.put("clustering.columns", "a,b");
- options.put("clustering.strategy", "zorder");
- options.put("clustering.incremental", "true");
- options.put("scan.parallelism", "1");
- options.put("sink.parallelism", "1");
- return options;
+ private static Schema schema(String partitionKeys, int sinkParallelism) {
+ return schema(partitionKeys, sinkParallelism, Collections.emptyMap());
}
private static Schema schema(
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 12490f9c20..42525cd46d 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,8 +25,6 @@ under the License.
<suppressions>
<suppress files="DataTypes.java" checks="MethodNameCheck"/>
- <suppress files="CoreOptions.java" checks="FileLength"/>
-
<!-- target directory is not relevant for checkstyle -->
<suppress
files="[\\/]target[\\/]"