This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e04ead0b [core] DataEvolution table supports concurrent updates to 
different columns (#7867)
55e04ead0b is described below

commit 55e04ead0be4dfe51fa67aafe2de69471dd445c3
Author: Faiz <[email protected]>
AuthorDate: Sat May 23 22:21:36 2026 +0800

    [core] DataEvolution table supports concurrent updates to different columns 
(#7867)
---
 .../paimon/operation/FileStoreCommitImpl.java      |  11 +
 .../paimon/operation/commit/ConflictDetection.java |  44 ++--
 .../commit/RowIdColumnConflictChecker.java         | 232 +++++++++++++++++++++
 .../commit/RowIdColumnConflictCheckerTest.java     | 176 ++++++++++++++++
 .../flink/action/DataEvolutionMergeIntoAction.java |  15 +-
 .../DataEvolutionPartialWriteOperator.java         |   6 +-
 .../action/DataEvolutionMergeIntoActionITCase.java |   4 -
 .../paimon/spark/sql/RowTrackingTestBase.scala     |  42 ++++
 8 files changed, 499 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 0032d69adf..3f9fdb9f1c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -49,6 +49,7 @@ import org.apache.paimon.operation.commit.ConflictDetection;
 import org.apache.paimon.operation.commit.ManifestEntryChanges;
 import org.apache.paimon.operation.commit.RetryCommitResult;
 import 
org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
+import org.apache.paimon.operation.commit.RowIdColumnConflictChecker;
 import 
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
 import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.operation.commit.SuccessCommitResult;
@@ -906,12 +907,22 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                 .filter(entry -> 
!baseIdentifiers.contains(entry.identifier()))
                                 .collect(Collectors.toList());
             }
+            RowIdColumnConflictChecker rowIdColumnConflictChecker = null;
+            if (conflictDetection.hasRowIdCheckFromSnapshot()) {
+                rowIdColumnConflictChecker =
+                        RowIdColumnConflictChecker.fromDataFiles(
+                                schemaManager,
+                                deltaFiles.stream()
+                                        .map(ManifestEntry::file)
+                                        .collect(Collectors.toList()));
+            }
             Optional<RuntimeException> exception =
                     conflictDetection.checkConflicts(
                             latestSnapshot,
                             baseDataFiles,
                             SimpleFileEntry.from(deltaFiles),
                             indexFiles,
+                            rowIdColumnConflictChecker,
                             commitKind);
             if (exception.isPresent()) {
                 if (allowRollback && rollback != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 46d5c43619..29ee045818 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -37,7 +37,6 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RangeHelper;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -64,6 +63,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static 
org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -160,6 +160,7 @@ public class ConflictDetection {
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
             List<IndexManifestEntry> deltaIndexEntries,
+            @Nullable RowIdColumnConflictChecker rowIdColumnConflictChecker,
             CommitKind commitKind) {
         String baseCommitUser = latestSnapshot.commitUser();
         if (deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE)) {
@@ -228,7 +229,8 @@ public class ConflictDetection {
             return exception;
         }
 
-        return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, 
deltaIndexEntries);
+        return checkForRowIdFromSnapshot(
+                latestSnapshot, deltaEntries, deltaIndexEntries, 
rowIdColumnConflictChecker);
     }
 
     public <T extends FileEntry> Map<BinaryRow, Integer> 
collectUncheckedBucketPartitions(
@@ -473,7 +475,7 @@ public class ConflictDetection {
         for (List<SimpleFileEntry> group : merged) {
             List<SimpleFileEntry> dataFiles = new ArrayList<>();
             for (SimpleFileEntry f : group) {
-                if (!isBlobFile(f.fileName())) {
+                if (!dedicatedStorageFile(f.fileName())) {
                     dataFiles.add(f);
                 }
             }
@@ -491,24 +493,19 @@ public class ConflictDetection {
     private Optional<RuntimeException> checkForRowIdFromSnapshot(
             Snapshot latestSnapshot,
             List<SimpleFileEntry> deltaEntries,
-            List<IndexManifestEntry> deltaIndexEntries) {
+            List<IndexManifestEntry> deltaIndexEntries,
+            @Nullable RowIdColumnConflictChecker columnChecker) {
         if (!dataEvolutionEnabled) {
             return Optional.empty();
         }
         if (rowIdCheckFromSnapshot == null) {
             return Optional.empty();
         }
+        if (columnChecker == null || columnChecker.isEmpty()) {
+            return Optional.empty();
+        }
 
         List<BinaryRow> changedPartitions = changedPartitions(deltaEntries, 
deltaIndexEntries);
-        // collect history row id ranges
-        List<Range> historyIdRanges = new ArrayList<>();
-        for (SimpleFileEntry entry : deltaEntries) {
-            Long firstRowId = entry.firstRowId();
-            long rowCount = entry.rowCount();
-            if (firstRowId != null) {
-                historyIdRanges.add(new Range(firstRowId, firstRowId + 
rowCount - 1));
-            }
-        }
 
         // check history row id ranges
         Long checkNextRowId = 
snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
@@ -525,16 +522,13 @@ public class ConflictDetection {
                     commitScanner.readIncrementalEntries(snapshot, 
changedPartitions);
             for (ManifestEntry entry : changes) {
                 DataFileMeta file = entry.file();
-                Range fileRange = file.nonNullRowIdRange();
-                if (fileRange.from < checkNextRowId) {
-                    for (Range range : historyIdRanges) {
-                        if (range.hasIntersection(fileRange)) {
-                            return Optional.of(
-                                    new RuntimeException(
-                                            "For Data Evolution table, 
multiple 'MERGE INTO' operations have encountered conflicts,"
-                                                    + " updating the same 
file, which can render some updates ineffective."));
-                        }
-                    }
+                if (file.firstRowId() != null
+                        && file.nonNullRowIdRange().from < checkNextRowId
+                        && columnChecker.conflictsWith(file)) {
+                    return Optional.of(
+                            new RuntimeException(
+                                    "For Data Evolution table, multiple 'MERGE 
INTO' operations have encountered conflicts,"
+                                            + " updating the same file, which 
can render some updates ineffective."));
                 }
             }
         }
@@ -542,6 +536,10 @@ public class ConflictDetection {
         return Optional.empty();
     }
 
+    private static boolean dedicatedStorageFile(String fileName) {
+        return isBlobFile(fileName) || isVectorStoreFile(fileName);
+    }
+
     static List<SimpleFileEntry> buildBaseEntriesWithDV(
             List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> 
baseIndexEntries) {
         if (baseEntries.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java
new file mode 100644
index 0000000000..b2f8740f52
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Detects row-id range conflicts only when written field ids overlap. The 
detection process is as
+ * below:
+ *
+ * <ol>
+ *   <li>Merge delta files by row range and calculate updated columns.
+ *   <li>Sort those items by range.
+ *   <li>For each checking files, do binary search to find overlapping ranges. 
If their updated
+ *       columns also overlap, return conflicting result.
+ * </ol>
+ */
+public class RowIdColumnConflictChecker {
+
+    private final SchemaManager schemaManager;
+    private final List<WriteRange> writeRanges;
+    private final Map<Long, Map<String, Integer>> fieldIdByNameCache = new 
HashMap<>();
+
+    private RowIdColumnConflictChecker(SchemaManager schemaManager, 
List<DataFileMeta> deltaFiles) {
+        this.schemaManager = schemaManager;
+        this.writeRanges = buildWriteRanges(deltaFiles);
+    }
+
+    public static RowIdColumnConflictChecker fromDataFiles(
+            SchemaManager schemaManager, List<DataFileMeta> deltaFiles) {
+        return new RowIdColumnConflictChecker(schemaManager, deltaFiles);
+    }
+
+    private List<WriteRange> buildWriteRanges(List<DataFileMeta> deltaFiles) {
+        List<DataFileMeta> rowIdFiles =
+                deltaFiles.stream()
+                        .filter(file -> file.firstRowId() != null)
+                        .collect(Collectors.toList());
+
+        if (rowIdFiles.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        // 1. merge overlapping ranges and calculate [Range, Set<FieldId>] 
tuples.
+        RangeHelper<DataFileMeta> rangeHelper = new 
RangeHelper<>(DataFileMeta::nonNullRowIdRange);
+        List<WriteRange> writeRanges = new ArrayList<>();
+        for (List<DataFileMeta> group : 
rangeHelper.mergeOverlappingRanges(rowIdFiles)) {
+            Range range = mergeRange(group);
+            Set<Integer> fieldIds = new HashSet<>();
+            for (DataFileMeta file : group) {
+                addWriteFieldIds(fieldIds, file);
+            }
+
+            writeRanges.add(new WriteRange(range, fieldIds));
+        }
+
+        // 2. sort by range for binary search
+        writeRanges.sort(
+                Comparator.comparingLong((WriteRange writeRange) -> 
writeRange.range.from)
+                        .thenComparingLong(writeRange -> writeRange.range.to));
+
+        return writeRanges;
+    }
+
+    private void addWriteFieldIds(Set<Integer> fieldIds, DataFileMeta file) {
+        List<String> writeCols = file.writeCols();
+        if (writeCols == null) {
+            fieldIds.addAll(
+                    fieldIdByNameCache
+                            .computeIfAbsent(file.schemaId(), 
this::fieldIdByName)
+                            .values());
+            return;
+        }
+
+        for (String writeCol : writeCols) {
+            Integer fieldId = fieldId(file, writeCol);
+            if (fieldId != null) {
+                fieldIds.add(fieldId);
+            }
+        }
+    }
+
+    private static Range mergeRange(List<DataFileMeta> files) {
+        long from = Long.MAX_VALUE;
+        long to = Long.MIN_VALUE;
+        for (DataFileMeta file : files) {
+            Range range = file.nonNullRowIdRange();
+            from = Math.min(from, range.from);
+            to = Math.max(to, range.to);
+        }
+        return new Range(from, to);
+    }
+
+    boolean isEmpty() {
+        return writeRanges.isEmpty();
+    }
+
+    /**
+     * Check whether a committed incremental file entry conflicts with current 
committing delta
+     * files. If an existing file has both overlapping row range and 
overlapping write fields, then
+     * it conflicts.
+     *
+     * @param file committed incremental data file
+     * @return true if conflict
+     */
+    boolean conflictsWith(DataFileMeta file) {
+        Long firstRowId = file.firstRowId();
+        if (firstRowId == null) {
+            return false;
+        }
+
+        Range range = new Range(firstRowId, firstRowId + file.rowCount() - 1);
+        int index = firstPossibleRange(range);
+        while (index < writeRanges.size()) {
+            WriteRange writeRange = writeRanges.get(index);
+            if (writeRange.range.from > range.to) {
+                return false;
+            }
+            // overlapping row range and overlapping write fields
+            if (writeRange.range.hasIntersection(range)
+                    && containsAnyWriteField(writeRange.fieldIds, file)) {
+                return true;
+            }
+            index++;
+        }
+        return false;
+    }
+
+    /**
+     * Binary search to find the first range whose `to` >= target range's 
`from`.
+     *
+     * @param range querying range
+     * @return index of the first range
+     */
+    private int firstPossibleRange(Range range) {
+        int low = 0;
+        int high = writeRanges.size();
+        while (low < high) {
+            int mid = (low + high) >>> 1;
+            if (writeRanges.get(mid).range.to < range.from) {
+                low = mid + 1;
+            } else {
+                high = mid;
+            }
+        }
+        return low;
+    }
+
+    private boolean containsAnyWriteField(Set<Integer> fieldIds, DataFileMeta 
file) {
+        List<String> writeCols = file.writeCols();
+        // If write cols == null, it's a full-schema write
+        if (writeCols == null) {
+            return true;
+        }
+
+        for (String writeCol : writeCols) {
+            Integer fieldId = fieldId(file, writeCol);
+            if (fieldId != null && fieldIds.contains(fieldId)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private Integer fieldId(DataFileMeta file, String writeCol) {
+        Integer fieldId =
+                fieldIdByNameCache
+                        .computeIfAbsent(file.schemaId(), this::fieldIdByName)
+                        .get(writeCol);
+        if (fieldId == null) {
+            if (SpecialFields.isSystemField(writeCol)) {
+                return null;
+            }
+            throw new RuntimeException(
+                    String.format(
+                            "Cannot find write column '%s' in schema %s.",
+                            writeCol, file.schemaId()));
+        }
+        return fieldId;
+    }
+
+    private Map<String, Integer> fieldIdByName(long schemaId) {
+        Map<String, Integer> fieldIdByName = new HashMap<>();
+        for (DataField field : 
schemaManager.schema(schemaId).logicalRowType().getFields()) {
+            fieldIdByName.put(field.name(), field.id());
+        }
+        return fieldIdByName;
+    }
+
+    /** Range and field id Set. */
+    private static class WriteRange {
+
+        private final Range range;
+        private final Set<Integer> fieldIds;
+
+        private WriteRange(Range range, Set<Integer> fieldIds) {
+            this.range = range;
+            this.fieldIds = fieldIds;
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java
new file mode 100644
index 0000000000..8c45bfbb33
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.stats.SimpleStats;
+import 
org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class RowIdColumnConflictCheckerTest {
+
+    @Test
+    void testAllowsDisjointWriteColumns() {
+        RowIdColumnConflictChecker checker =
+                checker(file("current", 0L, 10L, 0L, Arrays.asList("b")));
+
+        assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, 
Arrays.asList("c"))))
+                .isFalse();
+    }
+
+    @Test
+    void testDetectsSameWriteColumns() {
+        RowIdColumnConflictChecker checker =
+                checker(file("current", 0L, 10L, 0L, Arrays.asList("b")));
+
+        assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, 
Arrays.asList("b"))))
+                .isTrue();
+    }
+
+    @Test
+    void testUsesFieldIdAcrossRename() {
+        RowIdColumnConflictChecker checker =
+                checker(file("current", 0L, 10L, 1L, 
Arrays.asList("b_renamed")));
+
+        assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, 
Arrays.asList("b"))))
+                .isTrue();
+    }
+
+    @Test
+    void testTreatsNullWriteColumnsAsFullSchemaWrite() {
+        RowIdColumnConflictChecker checker = checker(file("current", 0L, 10L, 
0L, null));
+
+        assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, 
Arrays.asList("b"))))
+                .isTrue();
+    }
+
+    @Test
+    void testMergesOverlappedDeltaRangesAndWriteColumns() {
+        RowIdColumnConflictChecker checker =
+                checker(
+                        file("current-b", 0L, 11L, 0L, Arrays.asList("b")),
+                        file("current-c", 5L, 11L, 0L, Arrays.asList("c")));
+
+        assertThat(checker.conflictsWith(file("historical-b", 12L, 1L, 0L, 
Arrays.asList("b"))))
+                .isTrue();
+        assertThat(checker.conflictsWith(file("historical-c", 12L, 1L, 0L, 
Arrays.asList("c"))))
+                .isTrue();
+    }
+
+    @Test
+    void testScansAllOverlappedRangesAfterBinarySearch() {
+        RowIdColumnConflictChecker checker =
+                checker(
+                        file("current-b", 0L, 5L, 0L, Arrays.asList("b")),
+                        file("current-c", 10L, 5L, 0L, Arrays.asList("c")));
+
+        assertThat(checker.conflictsWith(file("historical", 3L, 10L, 0L, 
Arrays.asList("c"))))
+                .isTrue();
+    }
+
+    @Test
+    void testFailsOnUnknownNonSystemWriteColumn() {
+        RowIdColumnConflictChecker checker =
+                checker(file("current", 0L, 10L, 0L, Arrays.asList("b")));
+
+        assertThatThrownBy(
+                        () ->
+                                checker.conflictsWith(
+                                        file("historical", 0L, 10L, 0L, 
Arrays.asList("missing"))))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Cannot find write column 'missing'");
+    }
+
+    private RowIdColumnConflictChecker checker(DataFileMeta... files) {
+        return RowIdColumnConflictChecker.fromDataFiles(
+                createSchemaManager(), Arrays.asList(files));
+    }
+
+    private DataFileMeta file(
+            String fileName,
+            @Nullable Long firstRowId,
+            long rowCount,
+            long schemaId,
+            @Nullable List<String> writeCols) {
+        return DataFileMeta.forAppend(
+                fileName,
+                0L,
+                rowCount,
+                SimpleStats.EMPTY_STATS,
+                0L,
+                0L,
+                schemaId,
+                Collections.emptyList(),
+                null,
+                null,
+                null,
+                null,
+                firstRowId,
+                writeCols);
+    }
+
+    private SchemaManager createSchemaManager() {
+        Map<Long, org.apache.paimon.schema.TableSchema> schemas = new 
HashMap<>();
+        schemas.put(
+                0L,
+                org.apache.paimon.schema.TableSchema.create(
+                        0L,
+                        new Schema(
+                                Arrays.asList(
+                                        new DataField(0, "id", 
DataTypes.INT()),
+                                        new DataField(1, "b", DataTypes.INT()),
+                                        new DataField(2, "c", 
DataTypes.INT())),
+                                Collections.emptyList(),
+                                Collections.singletonList("id"),
+                                Collections.emptyMap(),
+                                "")));
+        schemas.put(
+                1L,
+                org.apache.paimon.schema.TableSchema.create(
+                        1L,
+                        new Schema(
+                                Arrays.asList(
+                                        new DataField(0, "id", 
DataTypes.INT()),
+                                        new DataField(1, "b_renamed", 
DataTypes.INT()),
+                                        new DataField(2, "c", 
DataTypes.INT())),
+                                Collections.emptyList(),
+                                Collections.singletonList("id"),
+                                Collections.emptyMap(),
+                                "")));
+        return new TestingSchemaManager(
+                new Path("/tmp/row-id-column-conflict-checker-test"), schemas);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index 019bf6484b..40c8911380 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -126,6 +126,9 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
 
     private int sinkParallelism;
 
+    // the snapshot id this action based on
+    private long baseSnapshotId;
+
     public DataEvolutionMergeIntoAction(
             String databaseName, String tableName, Map<String, String> 
catalogConfig) {
         super(databaseName, tableName, catalogConfig);
@@ -142,6 +145,7 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
             throw new UnsupportedOperationException(
                     "merge-into action doesn't support updating an empty 
table.");
         }
+        this.baseSnapshotId = latestSnapshotId;
         table =
                 table.copy(
                         Collections.singletonMap(
@@ -324,7 +328,7 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
         Transformation<RowData> sourceTransformation = 
source.getTransformation();
         List<Long> firstRowIds =
                 ((FileStoreTable) table)
-                        .store().newScan()
+                        .store().newScan().withSnapshot(baseSnapshotId)
                                 .withManifestEntryFilter(
                                         entry ->
                                                 entry.file().firstRowId() != 
null
@@ -384,13 +388,16 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
         return sorted.transform(
                         "PARTIAL WRITE COLUMNS",
                         new CommittableTypeInfo(),
-                        new DataEvolutionPartialWriteOperator((FileStoreTable) 
table, rowType))
+                        new DataEvolutionPartialWriteOperator(
+                                (FileStoreTable) table, rowType, 
baseSnapshotId))
                 .setParallelism(sinkParallelism);
     }
 
     public DataStream<Committable> commit(
             DataStream<Committable> written, Set<String> updatedColumns) {
         FileStoreTable storeTable = (FileStoreTable) table;
+        // copy to avoid serialization issue
+        long baseSnapshotId = this.baseSnapshotId;
 
         // Check if some global-indexed columns are updated
         DataStream<Committable> checked =
@@ -409,7 +416,9 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                         context ->
                                 new StoreCommitter(
                                         storeTable,
-                                        
storeTable.newCommit(context.commitUser()),
+                                        storeTable
+                                                
.newCommit(context.commitUser())
+                                                
.rowIdCheckConflict(baseSnapshotId),
                                         context),
                         new NoopCommittableStateManager());
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
index 278d58306c..f57393b7ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -70,6 +70,7 @@ public class DataEvolutionPartialWriteOperator
             LoggerFactory.getLogger(DataEvolutionPartialWriteOperator.class);
 
     private final FileStoreTable table;
+    private final Long baseSnapshotId;
 
     // dataType
     private final RowType dataType;
@@ -91,9 +92,11 @@ public class DataEvolutionPartialWriteOperator
     private transient AbstractFileStoreWrite<InternalRow> tableWrite;
     private transient Writer writer;
 
-    public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType 
dataType) {
+    public DataEvolutionPartialWriteOperator(
+            FileStoreTable table, RowType dataType, Long baseSnapshotId) {
         this.table =
                 
table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 
G"));
+        this.baseSnapshotId = baseSnapshotId;
         List<String> fieldNames =
                 dataType.getFieldNames().stream()
                         .filter(name -> 
!SpecialFields.ROW_ID.name().equals(name))
@@ -121,6 +124,7 @@ public class DataEvolutionPartialWriteOperator
                                         entry.file().firstRowId() != null
                                                 && 
!isBlobFile(entry.file().fileName())
                                                 && 
!isVectorStoreFile(entry.file().fileName()))
+                        .withSnapshot(baseSnapshotId)
                         .plan()
                         .files();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 53ecb5b034..06b08d2401 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -743,10 +743,6 @@ public class DataEvolutionMergeIntoActionITCase extends 
ActionITCaseBase {
                 "(1, 'name1', X'48656C6C6F')",
                 "(2, 'name2', X'5945')",
                 "(3, 'name3', X'414243')");
-        testBatchRead(
-                "SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` "
-                        + "WHERE file_path NOT LIKE '%.blob'",
-                Collections.singletonList(changelogRow("+I", 1L)));
         testBatchRead(
                 "SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` "
                         + "WHERE file_path LIKE '%.blob'",
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index e012656cc8..ff54322b92 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -140,6 +140,48 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Data Evolution: concurrent merge with disjoint update columns") {
+    withTable("sb", "sc", "t") {
+      sql(s"""
+            CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+                 'row-tracking.enabled' = 'true',
+                 'data-evolution.enabled' = 'true')
+          """)
+      sql("INSERT INTO t VALUES (1, 0, 0)")
+      Seq((1, 1)).toDF("id", "b").createOrReplaceTempView("sb")
+      Seq((1, 1)).toDF("id", "c").createOrReplaceTempView("sc")
+
+      val mergeB = Future {
+        for (_ <- 1 to 10) {
+          sql(s"""
+                 |MERGE INTO t
+                 |USING sb
+                 |ON t.id = sb.id
+                 |WHEN MATCHED THEN
+                 |UPDATE SET t.b = sb.b + t.b
+                 |""".stripMargin).collect()
+        }
+      }
+
+      val mergeC = Future {
+        for (_ <- 1 to 10) {
+          sql(s"""
+                 |MERGE INTO t
+                 |USING sc
+                 |ON t.id = sc.id
+                 |WHEN MATCHED THEN
+                 |UPDATE SET t.c = sc.c + t.c
+                 |""".stripMargin).collect()
+        }
+      }
+
+      Await.result(mergeB, 60.seconds)
+      Await.result(mergeC, 60.seconds)
+
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 10, 10)))
+    }
+  }
+
   test("Data Evolution: concurrent merge and small files compact") {
     withTable("s", "t") {
       sql(s"""

Reply via email to