This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f7ded227d3 [core] Respect rowsPerShard as hard constraint in global
index build (#7530)
f7ded227d3 is described below
commit f7ded227d37d22d0f96b3eff325fc47b15610934
Author: jerry <[email protected]>
AuthorDate: Thu Mar 26 11:02:27 2026 +0800
[core] Respect rowsPerShard as hard constraint in global index build (#7530)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 8 +--
.../globalindex/GlobalIndexBuilderUtils.java | 17 -----
.../globalindex/GlobalIndexBuilderUtilsTest.java | 81 ----------------------
.../flink/globalindex/GenericIndexTopoBuilder.java | 7 --
.../globalindex/DefaultGlobalIndexTopoBuilder.java | 9 ---
6 files changed, 5 insertions(+), 119 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 220ca4cdc4..6dd621fcac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -666,7 +666,7 @@ under the License.
<td><h5>global-index.build.max-shard</h5></td>
<td style="word-wrap: break-word;">32</td>
<td>Integer</td>
- <td>The max number of shards for building global index. If the
number of shards calculated by 'global-index.row-count-per-shard' exceeds this
value, 'global-index.row-count-per-shard' will be recalculated as
ceil(total-row-count / max-shard) to guarantee the shard count does not exceed
max-shard.</td>
+ <td>The preferred max number of shards for building global index.
If the number of shards calculated by 'global-index.row-count-per-shard'
exceeds this value, max-shard will be automatically increased to accommodate
the data volume while keeping 'global-index.row-count-per-shard' unchanged.</td>
</tr>
<tr>
<td><h5>global-index.column-update-action</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 4dc495c12b..7c054b8c8a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2322,11 +2322,11 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(32)
.withDescription(
- "The max number of shards for building global
index. "
+ "The preferred max number of shards for building
global index. "
+ "If the number of shards calculated by
'global-index.row-count-per-shard' "
- + "exceeds this value,
'global-index.row-count-per-shard' will be "
- + "recalculated as ceil(total-row-count /
max-shard) to guarantee "
- + "the shard count does not exceed
max-shard.");
+ + "exceeds this value, max-shard will be
automatically increased "
+ + "to accommodate the data volume while
keeping "
+ + "'global-index.row-count-per-shard'
unchanged.");
public static final ConfigOption<Integer>
GLOBAL_INDEX_BUILD_MAX_PARALLELISM =
key("global-index.build.max-parallelism")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
index 19ffc3bb40..085423efa8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
@@ -71,23 +71,6 @@ public class GlobalIndexBuilderUtils {
return results;
}
- /**
- * Adjust rowsPerShard if the estimated shard count exceeds maxShard.
Returns the adjusted
- * rowsPerShard value.
- */
- public static long adjustRowsPerShard(long rowsPerShard, long
totalRowCount, int maxShard) {
- long estimatedShards = ceilDivision(totalRowCount, rowsPerShard);
- if (estimatedShards > maxShard) {
- return ceilDivision(totalRowCount, maxShard);
- }
- return rowsPerShard;
- }
-
- /** Integer ceiling division: returns ceil(a / b) for positive a and b. */
- private static long ceilDivision(long a, long b) {
- return (a + b - 1) / b;
- }
-
public static GlobalIndexWriter createIndexWriter(
FileStoreTable table, String indexType, DataField indexField,
Options options)
throws IOException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java
deleted file mode 100644
index 612f41e0e9..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.globalindex;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalIndexBuilderUtils#adjustRowsPerShard}. */
-public class GlobalIndexBuilderUtilsTest {
-
- @Test
- void testAdjustRowsPerShardNoAdjustmentNeeded() {
- // 1000 rows, 100 per shard = 10 shards, maxShard = 20 -> no adjustment
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000,
20);
- assertThat(result).isEqualTo(100);
- }
-
- @Test
- void testAdjustRowsPerShardExactMatch() {
- // 1000 rows, 100 per shard = 10 shards, maxShard = 10 -> no adjustment
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000,
10);
- assertThat(result).isEqualTo(100);
- }
-
- @Test
- void testAdjustRowsPerShardExceedsMaxShard() {
- // 1000 rows, 100 per shard = 10 shards, maxShard = 3 -> adjust to
ceil(1000/3) = 334
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 3);
- assertThat(result).isEqualTo(334);
- // Verify: ceil(1000/334) = 3 shards
- assertThat((1000 + result - 1) / result).isEqualTo(3);
- }
-
- @Test
- void testAdjustRowsPerShardMaxShardOne() {
- // 1000 rows, 100 per shard = 10 shards, maxShard = 1 -> all in one
shard
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 1);
- assertThat(result).isEqualTo(1000);
- assertThat((1000 + result - 1) / result).isEqualTo(1);
- }
-
- @Test
- void testAdjustRowsPerShardEvenDivision() {
- // 1000 rows, 100 per shard = 10 shards, maxShard = 5 -> adjust to 200
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 5);
- assertThat(result).isEqualTo(200);
- assertThat((1000 + result - 1) / result).isEqualTo(5);
- }
-
- @Test
- void testAdjustRowsPerShardLargeRowCount() {
- // 10M rows, 100K per shard = 100 shards, maxShard = 10 -> adjust to 1M
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100000,
10000000, 10);
- assertThat(result).isEqualTo(1000000);
- assertThat((10000000 + result - 1) / result).isEqualTo(10);
- }
-
- @Test
- void testAdjustRowsPerShardTotalRowsLessThanRowsPerShard() {
- // 50 rows, 100 per shard = 1 shard, maxShard = 3 -> no adjustment
- long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 50, 3);
- assertThat(result).isEqualTo(100);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index 44239b3876..865280763c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -28,7 +28,6 @@ import
org.apache.paimon.flink.sink.NoopCommittableStateManager;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
-import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.IndexFileMeta;
@@ -159,12 +158,6 @@ public class GenericIndexTopoBuilder {
rowsPerShard > 0,
"Option 'global-index.row-count-per-shard' must be greater
than 0.");
- int maxShard =
mergedOptions.get(CoreOptions.GLOBAL_INDEX_BUILD_MAX_SHARD);
- checkArgument(
- maxShard > 0, "Option 'global-index.build.max-shard' must be
greater than 0.");
- rowsPerShard =
- GlobalIndexBuilderUtils.adjustRowsPerShard(rowsPerShard,
totalRowCount, maxShard);
-
// Compute shard tasks at file level from the provided entries
List<ShardTask> shardTasks = computeShardTasks(table, entries,
rowsPerShard);
if (shardTasks.isEmpty()) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
index 04593c55a3..afd954c39a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
@@ -21,7 +21,6 @@ package org.apache.paimon.spark.globalindex;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
@@ -56,7 +55,6 @@ import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_BUILD_MAX_SHARD;
import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ROW_COUNT_PER_SHARD;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -88,15 +86,8 @@ public class DefaultGlobalIndexTopoBuilder implements
GlobalIndexTopologyBuilder
rowsPerShard > 0,
"Option 'global-index.row-count-per-shard' must be greater
than 0.");
- int maxShard = tableOptions.get(GLOBAL_INDEX_BUILD_MAX_SHARD);
- checkArgument(
- maxShard > 0, "Option 'global-index.build.max-shard' must be
greater than 0.");
List<ManifestEntry> entries =
table.store().newScan().withPartitionFilter(partitionPredicate).plan().files();
- long totalRowCount = entries.stream().mapToLong(e ->
e.file().rowCount()).sum();
- rowsPerShard =
- GlobalIndexBuilderUtils.adjustRowsPerShard(rowsPerShard,
totalRowCount, maxShard);
-
// generate splits for each partition && shard
Map<BinaryRow, List<IndexedSplit>> splits = split(table, entries,
rowsPerShard);