This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aa3598986 [core] Extract RowTrackingCommitUtils from 
FileStoreCommitImpl
4aa3598986 is described below

commit 4aa35989869c1ca35c0cdf80d8aa417e75ccdf58
Author: JingsongLi <[email protected]>
AuthorDate: Tue Dec 30 15:57:44 2025 +0800

    [core] Extract RowTrackingCommitUtils from FileStoreCommitImpl
---
 .../paimon/operation/FileStoreCommitImpl.java      |  68 ++-----------
 .../operation/commit/RowTrackingCommitUtils.java   | 108 +++++++++++++++++++++
 2 files changed, 114 insertions(+), 62 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index b446078932..aa47e69d5e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -30,7 +30,6 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestCommittable;
@@ -50,6 +49,7 @@ import org.apache.paimon.operation.commit.ConflictDetection;
 import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
 import org.apache.paimon.operation.commit.ManifestEntryChanges;
 import org.apache.paimon.operation.commit.RetryCommitResult;
+import 
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
 import org.apache.paimon.operation.commit.SuccessCommitResult;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
@@ -62,7 +62,6 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -95,7 +94,6 @@ import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.manifest.ManifestEntry.nullableRecordCount;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -103,6 +101,7 @@ import static 
org.apache.paimon.operation.commit.ConflictDetection.hasConflictCh
 import static 
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
 import static 
org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck;
 import static 
org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
+import static 
org.apache.paimon.operation.commit.RowTrackingCommitUtils.assignRowTracking;
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -971,14 +970,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             baseManifestList = manifestList.write(mergeAfterManifests);
 
             if (rowTrackingEnabled) {
-                // assigned snapshot id to delta files
-                List<ManifestEntry> snapshotAssigned = new ArrayList<>();
-                assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
-                // assign row id for new files
-                List<ManifestEntry> rowIdAssigned = new ArrayList<>();
-                nextRowIdStart =
-                        assignRowTrackingMeta(firstRowIdStart, 
snapshotAssigned, rowIdAssigned);
-                deltaFiles = rowIdAssigned;
+                RowTrackingAssigned assigned =
+                        assignRowTracking(newSnapshotId, firstRowIdStart, 
deltaFiles);
+                nextRowIdStart = assigned.nextRowIdStart;
+                deltaFiles = assigned.assignedEntries;
             }
 
             // the added records subtract the deleted records from
@@ -1132,57 +1127,6 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return commitSnapshotImpl(newSnapshot, emptyList());
     }
 
-    private long assignRowTrackingMeta(
-            long firstRowIdStart,
-            List<ManifestEntry> deltaFiles,
-            List<ManifestEntry> rowIdAssigned) {
-        if (deltaFiles.isEmpty()) {
-            return firstRowIdStart;
-        }
-        // assign row id for new files
-        long start = firstRowIdStart;
-        long blobStart = firstRowIdStart;
-        for (ManifestEntry entry : deltaFiles) {
-            checkArgument(
-                    entry.file().fileSource().isPresent(),
-                    "This is a bug, file source field for row-tracking table 
must present.");
-            boolean containsRowId =
-                    entry.file().writeCols() != null
-                            && 
entry.file().writeCols().contains(SpecialFields.ROW_ID.name());
-            if (entry.file().fileSource().get().equals(FileSource.APPEND)
-                    && entry.file().firstRowId() == null
-                    && !containsRowId) {
-                if (isBlobFile(entry.file().fileName())) {
-                    if (blobStart >= start) {
-                        throw new IllegalStateException(
-                                String.format(
-                                        "This is a bug, blobStart %d should be 
less than start %d when assigning a blob entry file.",
-                                        blobStart, start));
-                    }
-                    long rowCount = entry.file().rowCount();
-                    rowIdAssigned.add(entry.assignFirstRowId(blobStart));
-                    blobStart += rowCount;
-                } else {
-                    long rowCount = entry.file().rowCount();
-                    rowIdAssigned.add(entry.assignFirstRowId(start));
-                    blobStart = start;
-                    start += rowCount;
-                }
-            } else {
-                // for compact file, do not assign first row id.
-                rowIdAssigned.add(entry);
-            }
-        }
-        return start;
-    }
-
-    private void assignSnapshotId(
-            long snapshotId, List<ManifestEntry> deltaFiles, 
List<ManifestEntry> snapshotAssigned) {
-        for (ManifestEntry entry : deltaFiles) {
-            snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, 
snapshotId));
-        }
-    }
-
     public void compactManifest() {
         int retryCount = 0;
         long startMillis = System.currentTimeMillis();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
new file mode 100644
index 0000000000..d6eeae17f0
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.SpecialFields;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Utils for row tracking commit. */
+public class RowTrackingCommitUtils {
+
+    public static RowTrackingAssigned assignRowTracking(
+            long newSnapshotId, long firstRowIdStart, List<ManifestEntry> 
deltaFiles) {
+        // assigned snapshot id to delta files
+        List<ManifestEntry> snapshotAssigned = new ArrayList<>();
+        assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
+        // assign row id for new files
+        List<ManifestEntry> rowIdAssigned = new ArrayList<>();
+        long nextRowIdStart =
+                assignRowTrackingMeta(firstRowIdStart, snapshotAssigned, 
rowIdAssigned);
+        return new RowTrackingAssigned(nextRowIdStart, rowIdAssigned);
+    }
+
+    private static void assignSnapshotId(
+            long snapshotId, List<ManifestEntry> deltaFiles, 
List<ManifestEntry> snapshotAssigned) {
+        for (ManifestEntry entry : deltaFiles) {
+            snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, 
snapshotId));
+        }
+    }
+
+    private static long assignRowTrackingMeta(
+            long firstRowIdStart,
+            List<ManifestEntry> deltaFiles,
+            List<ManifestEntry> rowIdAssigned) {
+        if (deltaFiles.isEmpty()) {
+            return firstRowIdStart;
+        }
+        // assign row id for new files
+        long start = firstRowIdStart;
+        long blobStart = firstRowIdStart;
+        for (ManifestEntry entry : deltaFiles) {
+            Optional<FileSource> fileSource = entry.file().fileSource();
+            checkArgument(
+                    fileSource.isPresent(),
+                    "This is a bug, file source field for row-tracking table 
must present.");
+            List<String> writeCols = entry.file().writeCols();
+            boolean containsRowId =
+                    writeCols != null && 
writeCols.contains(SpecialFields.ROW_ID.name());
+            if (fileSource.get().equals(FileSource.APPEND)
+                    && entry.file().firstRowId() == null
+                    && !containsRowId) {
+                long rowCount = entry.file().rowCount();
+                if (isBlobFile(entry.file().fileName())) {
+                    if (blobStart >= start) {
+                        throw new IllegalStateException(
+                                String.format(
+                                        "This is a bug, blobStart %d should be 
less than start %d when assigning a blob entry file.",
+                                        blobStart, start));
+                    }
+                    rowIdAssigned.add(entry.assignFirstRowId(blobStart));
+                    blobStart += rowCount;
+                } else {
+                    rowIdAssigned.add(entry.assignFirstRowId(start));
+                    blobStart = start;
+                    start += rowCount;
+                }
+            } else {
+                // for compact file, do not assign first row id.
+                rowIdAssigned.add(entry);
+            }
+        }
+        return start;
+    }
+
+    /** Assigned results. */
+    public static class RowTrackingAssigned {
+        public final long nextRowIdStart;
+        public final List<ManifestEntry> assignedEntries;
+
+        public RowTrackingAssigned(long nextRowIdStart, List<ManifestEntry> 
assignedEntries) {
+            this.nextRowIdStart = nextRowIdStart;
+            this.assignedEntries = assignedEntries;
+        }
+    }
+}

Reply via email to