This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9eeb37e29f [core] add an option to group delta files by partition for
row tracking table (#7794)
9eeb37e29f is described below
commit 9eeb37e29f3385628b9925790b25f33069257993
Author: Faiz <[email protected]>
AuthorDate: Thu May 14 22:35:06 2026 +0800
[core] add an option to group delta files by partition for row tracking
table (#7794)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 13 ++++
.../paimon/operation/FileStoreCommitImpl.java | 10 +++
.../paimon/table/DataEvolutionTableTest.java | 84 ++++++++++++++++++++++
4 files changed, 113 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 1928fde9dd..c3575cd37d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1169,6 +1169,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td>Boolean</td>
<td>Whether enable unique row id for append table.</td>
</tr>
+ <tr>
+ <td><h5>row-tracking.partition-group-on-commit</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>When row-tracking is enabled, whether to group new file metas
by partition before commit, so that assigned row IDs are contiguous within each
partition.This is useful if you want to build global indices on this table.
</td>
+ </tr>
<tr>
<td><h5>rowkind.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 8ab03c868c..e15a86d8ff 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2199,6 +2199,15 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether enable unique row id for append
table.");
+ public static final ConfigOption<Boolean>
ROW_TRACKING_PARTITION_GROUP_ON_COMMIT =
+ key("row-tracking.partition-group-on-commit")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "When row-tracking is enabled, whether to group
new file metas by partition "
+ + "before commit, so that assigned row IDs
are contiguous within each partition."
+ + "This is useful if you want to build
global indices on this table. ");
+
@Immutable
public static final ConfigOption<Boolean> DATA_EVOLUTION_ENABLED =
key("data-evolution.enabled")
@@ -3650,6 +3659,10 @@ public class CoreOptions implements Serializable {
return options.get(ROW_TRACKING_ENABLED);
}
+ public boolean rowTrackingPartitionGroupOnCommit() {
+ return options.get(ROW_TRACKING_PARTITION_GROUP_ON_COMMIT);
+ }
+
public boolean dataEvolutionEnabled() {
return options.get(DATA_EVOLUTION_ENABLED);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 8448a53fd9..ffdfe8523d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -81,6 +81,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -963,6 +964,15 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
baseManifestList = manifestList.write(mergeAfterManifests);
if (options.rowTrackingEnabled()) {
+ if (options.rowTrackingPartitionGroupOnCommit()) {
+ Map<BinaryRow, List<ManifestEntry>> deltaFilesByPart =
+ deltaFiles.stream()
+
.collect(Collectors.groupingBy(ManifestEntry::partition));
+ deltaFiles =
+ deltaFilesByPart.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
RowTrackingAssigned assigned =
assignRowTracking(newSnapshotId, firstRowIdStart,
deltaFiles);
nextRowIdStart = assigned.nextRowIdStart;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 118a4fdefe..14b7f7a5c7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
@@ -54,8 +55,11 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -846,6 +850,75 @@ public class DataEvolutionTableTest extends
DataEvolutionTestBase {
assertThat(file2.rowCount()).isEqualTo(2L);
}
+ @Test
+ public void testPartitionGroupedRowIdsWithBlobFiles() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("pt", DataTypes.STRING());
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.partitionKeys("pt");
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "128 MB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "1 b");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.ROW_TRACKING_PARTITION_GROUP_ON_COMMIT.key(),
"true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+ FileStoreTable table = getTableDefault();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ byte[] blobBytes = new byte[1];
+ Arrays.fill(blobBytes, (byte) 1);
+ String[] partitionOrder =
+ new String[] {
+ "pt1", "pt0", "pt2", "pt1", "pt2", "pt0",
+ "pt0", "pt2", "pt1", "pt2", "pt0", "pt1",
+ "pt1", "pt2", "pt0", "pt0", "pt1", "pt2"
+ };
+
+ try (BatchTableWrite write = builder.newWrite()) {
+ for (int i = 0; i < partitionOrder.length; i++) {
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString(partitionOrder[i]),
+ i,
+ BinaryString.fromString("v" + i),
+ new BlobData(blobBytes)));
+ }
+
+ try (BatchTableCommit commit = builder.newCommit()) {
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ Map<String, List<DataFileMeta>> filesByPartition = new HashMap<>();
+ for (ManifestEntry entry : table.store().newScan().plan().files()) {
+ String partition = entry.partition().getString(0).toString();
+ filesByPartition.computeIfAbsent(partition, k -> new
ArrayList<>()).add(entry.file());
+ }
+
+ assertThat(filesByPartition.size()).isEqualTo(3);
+ for (List<DataFileMeta> files : filesByPartition.values()) {
+ List<DataFileMeta> regularFiles =
+ files.stream()
+ .filter(file -> !"blob".equals(file.fileFormat()))
+ .collect(Collectors.toList());
+ List<DataFileMeta> blobFiles =
+ files.stream()
+ .filter(file -> "blob".equals(file.fileFormat()))
+ .collect(Collectors.toList());
+
+ assertThat(regularFiles.size()).isEqualTo(1);
+ assertThat(blobFiles.size()).isGreaterThan(1);
+
+ Range regularRange = assertContinuousRowIdRange(regularFiles);
+ Range blobRange = assertContinuousRowIdRange(blobFiles);
+ assertThat(regularRange.count()).isEqualTo(6L);
+ assertThat(blobRange).isEqualTo(regularRange);
+ }
+ }
+
@Test
public void testNonNullColumn() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -996,4 +1069,15 @@ public class DataEvolutionTableTest extends
DataEvolutionTestBase {
null));
assertThat(path4.toString()).isEqualTo(testExternalpath2);
}
+
+ private Range assertContinuousRowIdRange(List<DataFileMeta> files) {
+ files.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
+ long start = files.get(0).nonNullFirstRowId();
+ long expectedStart = start;
+ for (DataFileMeta file : files) {
+ assertThat(file.nonNullFirstRowId()).isEqualTo(expectedStart);
+ expectedStart += file.rowCount();
+ }
+ return new Range(start, expectedStart - 1);
+ }
}