This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eeb37e29f [core] add an option to group delta files by partition for 
row tracking table (#7794)
9eeb37e29f is described below

commit 9eeb37e29f3385628b9925790b25f33069257993
Author: Faiz <[email protected]>
AuthorDate: Thu May 14 22:35:06 2026 +0800

    [core] add an option to group delta files by partition for row tracking 
table (#7794)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 13 ++++
 .../paimon/operation/FileStoreCommitImpl.java      | 10 +++
 .../paimon/table/DataEvolutionTableTest.java       | 84 ++++++++++++++++++++++
 4 files changed, 113 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 1928fde9dd..c3575cd37d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1169,6 +1169,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>Boolean</td>
             <td>Whether enable unique row id for append table.</td>
         </tr>
+        <tr>
+            <td><h5>row-tracking.partition-group-on-commit</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When row-tracking is enabled, whether to group new file metas 
by partition before commit, so that assigned row IDs are contiguous within each 
partition.This is useful if you want to build global indices on this table. 
</td>
+        </tr>
         <tr>
             <td><h5>rowkind.field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 8ab03c868c..e15a86d8ff 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2199,6 +2199,15 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether enable unique row id for append 
table.");
 
+    public static final ConfigOption<Boolean> 
ROW_TRACKING_PARTITION_GROUP_ON_COMMIT =
+            key("row-tracking.partition-group-on-commit")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When row-tracking is enabled, whether to group 
new file metas by partition "
+                                    + "before commit, so that assigned row IDs 
are contiguous within each partition."
+                                    + "This is useful if you want to build 
global indices on this table. ");
+
     @Immutable
     public static final ConfigOption<Boolean> DATA_EVOLUTION_ENABLED =
             key("data-evolution.enabled")
@@ -3650,6 +3659,10 @@ public class CoreOptions implements Serializable {
         return options.get(ROW_TRACKING_ENABLED);
     }
 
+    public boolean rowTrackingPartitionGroupOnCommit() {
+        return options.get(ROW_TRACKING_PARTITION_GROUP_ON_COMMIT);
+    }
+
     public boolean dataEvolutionEnabled() {
         return options.get(DATA_EVOLUTION_ENABLED);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 8448a53fd9..ffdfe8523d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -81,6 +81,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -963,6 +964,15 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             baseManifestList = manifestList.write(mergeAfterManifests);
 
             if (options.rowTrackingEnabled()) {
+                if (options.rowTrackingPartitionGroupOnCommit()) {
+                    Map<BinaryRow, List<ManifestEntry>> deltaFilesByPart =
+                            deltaFiles.stream()
+                                    
.collect(Collectors.groupingBy(ManifestEntry::partition));
+                    deltaFiles =
+                            deltaFilesByPart.values().stream()
+                                    .flatMap(Collection::stream)
+                                    .collect(Collectors.toList());
+                }
                 RowTrackingAssigned assigned =
                         assignRowTracking(newSnapshotId, firstRowIdStart, 
deltaFiles);
                 nextRowIdStart = assigned.nextRowIdStart;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 118a4fdefe..14b7f7a5c7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
 import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
@@ -54,8 +55,11 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -846,6 +850,75 @@ public class DataEvolutionTableTest extends 
DataEvolutionTestBase {
         assertThat(file2.rowCount()).isEqualTo(2L);
     }
 
+    @Test
+    public void testPartitionGroupedRowIdsWithBlobFiles() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("pt", DataTypes.STRING());
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.partitionKeys("pt");
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "128 MB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "1 b");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        
schemaBuilder.option(CoreOptions.ROW_TRACKING_PARTITION_GROUP_ON_COMMIT.key(), 
"true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+        FileStoreTable table = getTableDefault();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+        byte[] blobBytes = new byte[1];
+        Arrays.fill(blobBytes, (byte) 1);
+        String[] partitionOrder =
+                new String[] {
+                    "pt1", "pt0", "pt2", "pt1", "pt2", "pt0",
+                    "pt0", "pt2", "pt1", "pt2", "pt0", "pt1",
+                    "pt1", "pt2", "pt0", "pt0", "pt1", "pt2"
+                };
+
+        try (BatchTableWrite write = builder.newWrite()) {
+            for (int i = 0; i < partitionOrder.length; i++) {
+                write.write(
+                        GenericRow.of(
+                                BinaryString.fromString(partitionOrder[i]),
+                                i,
+                                BinaryString.fromString("v" + i),
+                                new BlobData(blobBytes)));
+            }
+
+            try (BatchTableCommit commit = builder.newCommit()) {
+                commit.commit(write.prepareCommit());
+            }
+        }
+
+        Map<String, List<DataFileMeta>> filesByPartition = new HashMap<>();
+        for (ManifestEntry entry : table.store().newScan().plan().files()) {
+            String partition = entry.partition().getString(0).toString();
+            filesByPartition.computeIfAbsent(partition, k -> new 
ArrayList<>()).add(entry.file());
+        }
+
+        assertThat(filesByPartition.size()).isEqualTo(3);
+        for (List<DataFileMeta> files : filesByPartition.values()) {
+            List<DataFileMeta> regularFiles =
+                    files.stream()
+                            .filter(file -> !"blob".equals(file.fileFormat()))
+                            .collect(Collectors.toList());
+            List<DataFileMeta> blobFiles =
+                    files.stream()
+                            .filter(file -> "blob".equals(file.fileFormat()))
+                            .collect(Collectors.toList());
+
+            assertThat(regularFiles.size()).isEqualTo(1);
+            assertThat(blobFiles.size()).isGreaterThan(1);
+
+            Range regularRange = assertContinuousRowIdRange(regularFiles);
+            Range blobRange = assertContinuousRowIdRange(blobFiles);
+            assertThat(regularRange.count()).isEqualTo(6L);
+            assertThat(blobRange).isEqualTo(regularRange);
+        }
+    }
+
     @Test
     public void testNonNullColumn() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -996,4 +1069,15 @@ public class DataEvolutionTableTest extends 
DataEvolutionTestBase {
                                 null));
         assertThat(path4.toString()).isEqualTo(testExternalpath2);
     }
+
+    private Range assertContinuousRowIdRange(List<DataFileMeta> files) {
+        files.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
+        long start = files.get(0).nonNullFirstRowId();
+        long expectedStart = start;
+        for (DataFileMeta file : files) {
+            assertThat(file.nonNullFirstRowId()).isEqualTo(expectedStart);
+            expectedStart += file.rowCount();
+        }
+        return new Range(start, expectedStart - 1);
+    }
 }

Reply via email to