This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 55e04ead0b [core] DataEvolution table supports concurrent updates to
different columns (#7867)
55e04ead0b is described below
commit 55e04ead0be4dfe51fa67aafe2de69471dd445c3
Author: Faiz <[email protected]>
AuthorDate: Sat May 23 22:21:36 2026 +0800
[core] DataEvolution table supports concurrent updates to different columns
(#7867)
---
.../paimon/operation/FileStoreCommitImpl.java | 11 +
.../paimon/operation/commit/ConflictDetection.java | 44 ++--
.../commit/RowIdColumnConflictChecker.java | 232 +++++++++++++++++++++
.../commit/RowIdColumnConflictCheckerTest.java | 176 ++++++++++++++++
.../flink/action/DataEvolutionMergeIntoAction.java | 15 +-
.../DataEvolutionPartialWriteOperator.java | 6 +-
.../action/DataEvolutionMergeIntoActionITCase.java | 4 -
.../paimon/spark/sql/RowTrackingTestBase.scala | 42 ++++
8 files changed, 499 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 0032d69adf..3f9fdb9f1c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -49,6 +49,7 @@ import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.ManifestEntryChanges;
import org.apache.paimon.operation.commit.RetryCommitResult;
import
org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
+import org.apache.paimon.operation.commit.RowIdColumnConflictChecker;
import
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.operation.commit.SuccessCommitResult;
@@ -906,12 +907,22 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.filter(entry ->
!baseIdentifiers.contains(entry.identifier()))
.collect(Collectors.toList());
}
+ RowIdColumnConflictChecker rowIdColumnConflictChecker = null;
+ if (conflictDetection.hasRowIdCheckFromSnapshot()) {
+ rowIdColumnConflictChecker =
+ RowIdColumnConflictChecker.fromDataFiles(
+ schemaManager,
+ deltaFiles.stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList()));
+ }
Optional<RuntimeException> exception =
conflictDetection.checkConflicts(
latestSnapshot,
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
indexFiles,
+ rowIdColumnConflictChecker,
commitKind);
if (exception.isPresent()) {
if (allowRollback && rollback != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 46d5c43619..29ee045818 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -37,7 +37,6 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.SnapshotManager;
@@ -64,6 +63,7 @@ import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static
org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -160,6 +160,7 @@ public class ConflictDetection {
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
List<IndexManifestEntry> deltaIndexEntries,
+ @Nullable RowIdColumnConflictChecker rowIdColumnConflictChecker,
CommitKind commitKind) {
String baseCommitUser = latestSnapshot.commitUser();
if (deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE)) {
@@ -228,7 +229,8 @@ public class ConflictDetection {
return exception;
}
- return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries,
deltaIndexEntries);
+ return checkForRowIdFromSnapshot(
+ latestSnapshot, deltaEntries, deltaIndexEntries,
rowIdColumnConflictChecker);
}
public <T extends FileEntry> Map<BinaryRow, Integer>
collectUncheckedBucketPartitions(
@@ -473,7 +475,7 @@ public class ConflictDetection {
for (List<SimpleFileEntry> group : merged) {
List<SimpleFileEntry> dataFiles = new ArrayList<>();
for (SimpleFileEntry f : group) {
- if (!isBlobFile(f.fileName())) {
+ if (!dedicatedStorageFile(f.fileName())) {
dataFiles.add(f);
}
}
@@ -491,24 +493,19 @@ public class ConflictDetection {
private Optional<RuntimeException> checkForRowIdFromSnapshot(
Snapshot latestSnapshot,
List<SimpleFileEntry> deltaEntries,
- List<IndexManifestEntry> deltaIndexEntries) {
+ List<IndexManifestEntry> deltaIndexEntries,
+ @Nullable RowIdColumnConflictChecker columnChecker) {
if (!dataEvolutionEnabled) {
return Optional.empty();
}
if (rowIdCheckFromSnapshot == null) {
return Optional.empty();
}
+ if (columnChecker == null || columnChecker.isEmpty()) {
+ return Optional.empty();
+ }
List<BinaryRow> changedPartitions = changedPartitions(deltaEntries,
deltaIndexEntries);
- // collect history row id ranges
- List<Range> historyIdRanges = new ArrayList<>();
- for (SimpleFileEntry entry : deltaEntries) {
- Long firstRowId = entry.firstRowId();
- long rowCount = entry.rowCount();
- if (firstRowId != null) {
- historyIdRanges.add(new Range(firstRowId, firstRowId +
rowCount - 1));
- }
- }
// check history row id ranges
Long checkNextRowId =
snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
@@ -525,16 +522,13 @@ public class ConflictDetection {
commitScanner.readIncrementalEntries(snapshot,
changedPartitions);
for (ManifestEntry entry : changes) {
DataFileMeta file = entry.file();
- Range fileRange = file.nonNullRowIdRange();
- if (fileRange.from < checkNextRowId) {
- for (Range range : historyIdRanges) {
- if (range.hasIntersection(fileRange)) {
- return Optional.of(
- new RuntimeException(
- "For Data Evolution table,
multiple 'MERGE INTO' operations have encountered conflicts,"
- + " updating the same
file, which can render some updates ineffective."));
- }
- }
+ if (file.firstRowId() != null
+ && file.nonNullRowIdRange().from < checkNextRowId
+ && columnChecker.conflictsWith(file)) {
+ return Optional.of(
+ new RuntimeException(
+ "For Data Evolution table, multiple 'MERGE
INTO' operations have encountered conflicts,"
+ + " updating the same file, which
can render some updates ineffective."));
}
}
}
@@ -542,6 +536,10 @@ public class ConflictDetection {
return Optional.empty();
}
+ private static boolean dedicatedStorageFile(String fileName) {
+ return isBlobFile(fileName) || isVectorStoreFile(fileName);
+ }
+
static List<SimpleFileEntry> buildBaseEntriesWithDV(
List<SimpleFileEntry> baseEntries, List<IndexManifestEntry>
baseIndexEntries) {
if (baseEntries.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java
new file mode 100644
index 0000000000..b2f8740f52
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Detects row-id range conflicts only when written field ids overlap. The
detection process is as
+ * below:
+ *
+ * <ol>
+ * <li>Merge delta files by row range and calculate updated columns.
+ * <li>Sort those items by range.
+ * <li>For each checking files, do binary search to find overlapping ranges.
If their updated
+ * columns also overlap, return conflicting result.
+ * </ol>
+ */
+public class RowIdColumnConflictChecker {
+
+ private final SchemaManager schemaManager;
+ private final List<WriteRange> writeRanges;
+ private final Map<Long, Map<String, Integer>> fieldIdByNameCache = new
HashMap<>();
+
+ private RowIdColumnConflictChecker(SchemaManager schemaManager,
List<DataFileMeta> deltaFiles) {
+ this.schemaManager = schemaManager;
+ this.writeRanges = buildWriteRanges(deltaFiles);
+ }
+
+ public static RowIdColumnConflictChecker fromDataFiles(
+ SchemaManager schemaManager, List<DataFileMeta> deltaFiles) {
+ return new RowIdColumnConflictChecker(schemaManager, deltaFiles);
+ }
+
+ private List<WriteRange> buildWriteRanges(List<DataFileMeta> deltaFiles) {
+ List<DataFileMeta> rowIdFiles =
+ deltaFiles.stream()
+ .filter(file -> file.firstRowId() != null)
+ .collect(Collectors.toList());
+
+ if (rowIdFiles.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // 1. merge overlapping ranges and calculate [Range, Set<FieldId>]
tuples.
+ RangeHelper<DataFileMeta> rangeHelper = new
RangeHelper<>(DataFileMeta::nonNullRowIdRange);
+ List<WriteRange> writeRanges = new ArrayList<>();
+ for (List<DataFileMeta> group :
rangeHelper.mergeOverlappingRanges(rowIdFiles)) {
+ Range range = mergeRange(group);
+ Set<Integer> fieldIds = new HashSet<>();
+ for (DataFileMeta file : group) {
+ addWriteFieldIds(fieldIds, file);
+ }
+
+ writeRanges.add(new WriteRange(range, fieldIds));
+ }
+
+ // 2. sort by range for binary search
+ writeRanges.sort(
+ Comparator.comparingLong((WriteRange writeRange) ->
writeRange.range.from)
+ .thenComparingLong(writeRange -> writeRange.range.to));
+
+ return writeRanges;
+ }
+
+ private void addWriteFieldIds(Set<Integer> fieldIds, DataFileMeta file) {
+ List<String> writeCols = file.writeCols();
+ if (writeCols == null) {
+ fieldIds.addAll(
+ fieldIdByNameCache
+ .computeIfAbsent(file.schemaId(),
this::fieldIdByName)
+ .values());
+ return;
+ }
+
+ for (String writeCol : writeCols) {
+ Integer fieldId = fieldId(file, writeCol);
+ if (fieldId != null) {
+ fieldIds.add(fieldId);
+ }
+ }
+ }
+
+ private static Range mergeRange(List<DataFileMeta> files) {
+ long from = Long.MAX_VALUE;
+ long to = Long.MIN_VALUE;
+ for (DataFileMeta file : files) {
+ Range range = file.nonNullRowIdRange();
+ from = Math.min(from, range.from);
+ to = Math.max(to, range.to);
+ }
+ return new Range(from, to);
+ }
+
+ boolean isEmpty() {
+ return writeRanges.isEmpty();
+ }
+
+ /**
+ * Check whether a committed incremental file entry conflicts with current
committing delta
+ * files. If an existing file has both overlapping row range and
overlapping write fields, then
+ * it conflicts.
+ *
+ * @param file committed incremental data file
+ * @return true if conflict
+ */
+ boolean conflictsWith(DataFileMeta file) {
+ Long firstRowId = file.firstRowId();
+ if (firstRowId == null) {
+ return false;
+ }
+
+ Range range = new Range(firstRowId, firstRowId + file.rowCount() - 1);
+ int index = firstPossibleRange(range);
+ while (index < writeRanges.size()) {
+ WriteRange writeRange = writeRanges.get(index);
+ if (writeRange.range.from > range.to) {
+ return false;
+ }
+ // overlapping row range and overlapping write fields
+ if (writeRange.range.hasIntersection(range)
+ && containsAnyWriteField(writeRange.fieldIds, file)) {
+ return true;
+ }
+ index++;
+ }
+ return false;
+ }
+
+ /**
+ * Binary search to find the first range whose `to` >= target range's
`from`.
+ *
+ * @param range querying range
+ * @return index of the first range
+ */
+ private int firstPossibleRange(Range range) {
+ int low = 0;
+ int high = writeRanges.size();
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ if (writeRanges.get(mid).range.to < range.from) {
+ low = mid + 1;
+ } else {
+ high = mid;
+ }
+ }
+ return low;
+ }
+
+ private boolean containsAnyWriteField(Set<Integer> fieldIds, DataFileMeta
file) {
+ List<String> writeCols = file.writeCols();
+ // If write cols == null, it's a full-schema write
+ if (writeCols == null) {
+ return true;
+ }
+
+ for (String writeCol : writeCols) {
+ Integer fieldId = fieldId(file, writeCol);
+ if (fieldId != null && fieldIds.contains(fieldId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Integer fieldId(DataFileMeta file, String writeCol) {
+ Integer fieldId =
+ fieldIdByNameCache
+ .computeIfAbsent(file.schemaId(), this::fieldIdByName)
+ .get(writeCol);
+ if (fieldId == null) {
+ if (SpecialFields.isSystemField(writeCol)) {
+ return null;
+ }
+ throw new RuntimeException(
+ String.format(
+ "Cannot find write column '%s' in schema %s.",
+ writeCol, file.schemaId()));
+ }
+ return fieldId;
+ }
+
+ private Map<String, Integer> fieldIdByName(long schemaId) {
+ Map<String, Integer> fieldIdByName = new HashMap<>();
+ for (DataField field :
schemaManager.schema(schemaId).logicalRowType().getFields()) {
+ fieldIdByName.put(field.name(), field.id());
+ }
+ return fieldIdByName;
+ }
+
+ /** Range and field id Set. */
+ private static class WriteRange {
+
+ private final Range range;
+ private final Set<Integer> fieldIds;
+
+ private WriteRange(Range range, Set<Integer> fieldIds) {
+ this.range = range;
+ this.fieldIds = fieldIds;
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java
new file mode 100644
index 0000000000..8c45bfbb33
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.stats.SimpleStats;
+import
org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class RowIdColumnConflictCheckerTest {
+
+ @Test
+ void testAllowsDisjointWriteColumns() {
+ RowIdColumnConflictChecker checker =
+ checker(file("current", 0L, 10L, 0L, Arrays.asList("b")));
+
+ assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L,
Arrays.asList("c"))))
+ .isFalse();
+ }
+
+ @Test
+ void testDetectsSameWriteColumns() {
+ RowIdColumnConflictChecker checker =
+ checker(file("current", 0L, 10L, 0L, Arrays.asList("b")));
+
+ assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L,
Arrays.asList("b"))))
+ .isTrue();
+ }
+
+ @Test
+ void testUsesFieldIdAcrossRename() {
+ RowIdColumnConflictChecker checker =
+ checker(file("current", 0L, 10L, 1L,
Arrays.asList("b_renamed")));
+
+ assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L,
Arrays.asList("b"))))
+ .isTrue();
+ }
+
+ @Test
+ void testTreatsNullWriteColumnsAsFullSchemaWrite() {
+ RowIdColumnConflictChecker checker = checker(file("current", 0L, 10L,
0L, null));
+
+ assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L,
Arrays.asList("b"))))
+ .isTrue();
+ }
+
+ @Test
+ void testMergesOverlappedDeltaRangesAndWriteColumns() {
+ RowIdColumnConflictChecker checker =
+ checker(
+ file("current-b", 0L, 11L, 0L, Arrays.asList("b")),
+ file("current-c", 5L, 11L, 0L, Arrays.asList("c")));
+
+ assertThat(checker.conflictsWith(file("historical-b", 12L, 1L, 0L,
Arrays.asList("b"))))
+ .isTrue();
+ assertThat(checker.conflictsWith(file("historical-c", 12L, 1L, 0L,
Arrays.asList("c"))))
+ .isTrue();
+ }
+
+ @Test
+ void testScansAllOverlappedRangesAfterBinarySearch() {
+ RowIdColumnConflictChecker checker =
+ checker(
+ file("current-b", 0L, 5L, 0L, Arrays.asList("b")),
+ file("current-c", 10L, 5L, 0L, Arrays.asList("c")));
+
+ assertThat(checker.conflictsWith(file("historical", 3L, 10L, 0L,
Arrays.asList("c"))))
+ .isTrue();
+ }
+
+ @Test
+ void testFailsOnUnknownNonSystemWriteColumn() {
+ RowIdColumnConflictChecker checker =
+ checker(file("current", 0L, 10L, 0L, Arrays.asList("b")));
+
+ assertThatThrownBy(
+ () ->
+ checker.conflictsWith(
+ file("historical", 0L, 10L, 0L,
Arrays.asList("missing"))))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Cannot find write column 'missing'");
+ }
+
+ private RowIdColumnConflictChecker checker(DataFileMeta... files) {
+ return RowIdColumnConflictChecker.fromDataFiles(
+ createSchemaManager(), Arrays.asList(files));
+ }
+
+ private DataFileMeta file(
+ String fileName,
+ @Nullable Long firstRowId,
+ long rowCount,
+ long schemaId,
+ @Nullable List<String> writeCols) {
+ return DataFileMeta.forAppend(
+ fileName,
+ 0L,
+ rowCount,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ schemaId,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ private SchemaManager createSchemaManager() {
+ Map<Long, org.apache.paimon.schema.TableSchema> schemas = new
HashMap<>();
+ schemas.put(
+ 0L,
+ org.apache.paimon.schema.TableSchema.create(
+ 0L,
+ new Schema(
+ Arrays.asList(
+ new DataField(0, "id",
DataTypes.INT()),
+ new DataField(1, "b", DataTypes.INT()),
+ new DataField(2, "c",
DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.singletonList("id"),
+ Collections.emptyMap(),
+ "")));
+ schemas.put(
+ 1L,
+ org.apache.paimon.schema.TableSchema.create(
+ 1L,
+ new Schema(
+ Arrays.asList(
+ new DataField(0, "id",
DataTypes.INT()),
+ new DataField(1, "b_renamed",
DataTypes.INT()),
+ new DataField(2, "c",
DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.singletonList("id"),
+ Collections.emptyMap(),
+ "")));
+ return new TestingSchemaManager(
+ new Path("/tmp/row-id-column-conflict-checker-test"), schemas);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index 019bf6484b..40c8911380 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -126,6 +126,9 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
private int sinkParallelism;
+ // the snapshot id this action based on
+ private long baseSnapshotId;
+
public DataEvolutionMergeIntoAction(
String databaseName, String tableName, Map<String, String>
catalogConfig) {
super(databaseName, tableName, catalogConfig);
@@ -142,6 +145,7 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
throw new UnsupportedOperationException(
"merge-into action doesn't support updating an empty
table.");
}
+ this.baseSnapshotId = latestSnapshotId;
table =
table.copy(
Collections.singletonMap(
@@ -324,7 +328,7 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
Transformation<RowData> sourceTransformation =
source.getTransformation();
List<Long> firstRowIds =
((FileStoreTable) table)
- .store().newScan()
+ .store().newScan().withSnapshot(baseSnapshotId)
.withManifestEntryFilter(
entry ->
entry.file().firstRowId() !=
null
@@ -384,13 +388,16 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
return sorted.transform(
"PARTIAL WRITE COLUMNS",
new CommittableTypeInfo(),
- new DataEvolutionPartialWriteOperator((FileStoreTable)
table, rowType))
+ new DataEvolutionPartialWriteOperator(
+ (FileStoreTable) table, rowType,
baseSnapshotId))
.setParallelism(sinkParallelism);
}
public DataStream<Committable> commit(
DataStream<Committable> written, Set<String> updatedColumns) {
FileStoreTable storeTable = (FileStoreTable) table;
+ // copy to avoid serialization issue
+ long baseSnapshotId = this.baseSnapshotId;
// Check if some global-indexed columns are updated
DataStream<Committable> checked =
@@ -409,7 +416,9 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
context ->
new StoreCommitter(
storeTable,
-
storeTable.newCommit(context.commitUser()),
+ storeTable
+
.newCommit(context.commitUser())
+
.rowIdCheckConflict(baseSnapshotId),
context),
new NoopCommittableStateManager());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
index 278d58306c..f57393b7ca 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -70,6 +70,7 @@ public class DataEvolutionPartialWriteOperator
LoggerFactory.getLogger(DataEvolutionPartialWriteOperator.class);
private final FileStoreTable table;
+ private final Long baseSnapshotId;
// dataType
private final RowType dataType;
@@ -91,9 +92,11 @@ public class DataEvolutionPartialWriteOperator
private transient AbstractFileStoreWrite<InternalRow> tableWrite;
private transient Writer writer;
- public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType
dataType) {
+ public DataEvolutionPartialWriteOperator(
+ FileStoreTable table, RowType dataType, Long baseSnapshotId) {
this.table =
table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999
G"));
+ this.baseSnapshotId = baseSnapshotId;
List<String> fieldNames =
dataType.getFieldNames().stream()
.filter(name ->
!SpecialFields.ROW_ID.name().equals(name))
@@ -121,6 +124,7 @@ public class DataEvolutionPartialWriteOperator
entry.file().firstRowId() != null
&&
!isBlobFile(entry.file().fileName())
&&
!isVectorStoreFile(entry.file().fileName()))
+ .withSnapshot(baseSnapshotId)
.plan()
.files();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 53ecb5b034..06b08d2401 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -743,10 +743,6 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
"(1, 'name1', X'48656C6C6F')",
"(2, 'name2', X'5945')",
"(3, 'name3', X'414243')");
- testBatchRead(
- "SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` "
- + "WHERE file_path NOT LIKE '%.blob'",
- Collections.singletonList(changelogRow("+I", 1L)));
testBatchRead(
"SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` "
+ "WHERE file_path LIKE '%.blob'",
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index e012656cc8..ff54322b92 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -140,6 +140,48 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ test("Data Evolution: concurrent merge with disjoint update columns") {
+ withTable("sb", "sc", "t") {
+ sql(s"""
+ CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true')
+ """)
+ sql("INSERT INTO t VALUES (1, 0, 0)")
+ Seq((1, 1)).toDF("id", "b").createOrReplaceTempView("sb")
+ Seq((1, 1)).toDF("id", "c").createOrReplaceTempView("sc")
+
+ val mergeB = Future {
+ for (_ <- 1 to 10) {
+ sql(s"""
+ |MERGE INTO t
+ |USING sb
+ |ON t.id = sb.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.b = sb.b + t.b
+ |""".stripMargin).collect()
+ }
+ }
+
+ val mergeC = Future {
+ for (_ <- 1 to 10) {
+ sql(s"""
+ |MERGE INTO t
+ |USING sc
+ |ON t.id = sc.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.c = sc.c + t.c
+ |""".stripMargin).collect()
+ }
+ }
+
+ Await.result(mergeB, 60.seconds)
+ Await.result(mergeC, 60.seconds)
+
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 10, 10)))
+ }
+ }
+
test("Data Evolution: concurrent merge and small files compact") {
withTable("s", "t") {
sql(s"""