This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 77d0dcec0f [flink] Add data evolution row id reassign action (#7957)
77d0dcec0f is described below
commit 77d0dcec0ff508ed21f7df29c92e980e1997346f
Author: YeJunHao <[email protected]>
AuthorDate: Tue May 26 16:15:52 2026 +0800
[flink] Add data evolution row id reassign action (#7957)
Add a Flink action and procedure to reassign row IDs for data evolution
tables by rewriting metadata. This is useful when partition row-id
ranges overlap and should be normalized without rewriting data files.
---
docs/docs/flink/action-jars.md | 28 +
docs/docs/flink/procedures.md | 23 +-
.../DataEvolutionRowIdReassigner.java | 939 ++++++++++++++++++++
.../append/dataevolution/RowRangeMappingIndex.java | 138 +++
.../DataEvolutionRowIdReassignerTest.java | 949 +++++++++++++++++++++
.../dataevolution/RowRangeMappingIndexTest.java | 77 ++
.../paimon/flink/action/ReassignRowIdAction.java | 175 ++++
.../flink/action/ReassignRowIdActionFactory.java | 80 ++
.../flink/procedure/ReassignRowIdProcedure.java | 83 ++
.../services/org.apache.paimon.factories.Factory | 2 +
.../DataEvolutionRowIdReassignerTest.java | 230 +++++
.../api/functions/source/SourceFunction.java | 26 +
12 files changed, 2749 insertions(+), 1 deletion(-)
diff --git a/docs/docs/flink/action-jars.md b/docs/docs/flink/action-jars.md
index b2c3e5a73e..419c342008 100644
--- a/docs/docs/flink/action-jars.md
+++ b/docs/docs/flink/action-jars.md
@@ -307,6 +307,34 @@ For more information of 'rewrite_file_index', see
rewrite_file_index --help
```
+## Reassign Row ID
+
+Run the following command to submit a 'reassign_row_id' job for a data
evolution table.
+This action rewrites metadata to make partition row-id ranges non-overlapping.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-@@VERSION@@.jar \
+ reassign_row_id \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <table-name> \
+ [--table_conf <paimon-table-conf> [--table_conf <paimon-table-conf> ...]] \
+ [--partition <partition_spec> [--partition <partition_spec> ...]] \
+ [--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]]
+
+partition_spec:
+key1=value1,key2=value2...
+```
+
+For more information of 'reassign_row_id', see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-@@VERSION@@.jar \
+ reassign_row_id --help
+```
+
## Force Start Flink Job
Some actions, like `create_tag`, are lightweight and by default will not be
submitted as a job to Flink cluster. If you
diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index dc4f1fedaf..c1286f2be6 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -754,7 +754,28 @@ All available procedures are listed below.
CALL sys.rewrite_file_index(`table` => 'test_db.T')<br/><br/>
-- rewrite the file index for the specified partition in the
table<br/>
CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions =>
'pt=a')<br/><br/>
- </td>
+ </td>
+ </tr>
+ <tr>
+ <td>reassign_row_id</td>
+ <td>
+ -- Use named argument<br/>
+ CALL [catalog.]sys.reassign_row_id(<`table` => identifier> [,
<partitions => partitions>])<br/><br/>
+ -- Use indexed argument<br/>
+ CALL [catalog.]sys.reassign_row_id(<identifier> [,
<partitions>])<br/><br/>
+ </td>
+ <td>
+ Reassign row IDs for a data evolution table by rewriting metadata.
Argument:
+ <li>table: <databaseName>.<tableName>.</li>
+ <li>partitions : specific partitions.</li>
+ </td>
+ <td>
+ -- reassign row IDs for the whole table<br/>
+ CALL sys.reassign_row_id(`table` => 'test_db.T')<br/><br/>
+ -- reassign row IDs for the specified partition in the table<br/>
+ CALL sys.reassign_row_id(`table` => 'test_db.T', partitions =>
'pt=a')<br/><br/>
+ </td>
+ </tr>
<tr>
<td>create_branch</td>
<td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
new file mode 100644
index 0000000000..1faa4cf66a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
@@ -0,0 +1,939 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Reassigns row IDs for data evolution tables by rewriting metadata only. */
+public class DataEvolutionRowIdReassigner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataEvolutionRowIdReassigner.class);
+ private static final String COMMIT_USER_PREFIX = "reassign-row-id";
+
+ private final FileStoreTable table;
+ private final @Nullable PartitionPredicate partitionPredicate;
+
+ public DataEvolutionRowIdReassigner(FileStoreTable table) {
+ this(table, null);
+ }
+
+ public DataEvolutionRowIdReassigner(
+ FileStoreTable table, @Nullable PartitionPredicate
partitionPredicate) {
+ this.table = table;
+ this.partitionPredicate = partitionPredicate;
+ }
+
+ public Result reassign() {
+ Map<String, String> dynamicOptions = new
HashMap<>(table.coreOptions().toMap());
+ dynamicOptions.put(CoreOptions.COMMIT_USER_PREFIX.key(),
COMMIT_USER_PREFIX);
+ return reassign(CoreOptions.createCommitUser(new
Options(dynamicOptions)));
+ }
+
+ public Result reassign(String commitUser) {
+ checkArgument(
+ table.coreOptions().rowTrackingEnabled(),
+ "Table '%s' must enable 'row-tracking.enabled=true' before
reassigning row IDs.",
+ table.name());
+ checkArgument(
+ table.coreOptions().dataEvolutionEnabled(),
+ "Table '%s' must enable 'data-evolution.enabled=true' before
reassigning row IDs.",
+ table.name());
+
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ checkArgument(
+ latest != null, "Cannot reassign row IDs for empty table
'%s'.", table.name());
+ Long nextRowId = latest.nextRowId();
+ checkState(
+ nextRowId != null,
+ "Next row id cannot be null for row-tracking table '%s'.",
+ table.name());
+ if (table.schema().logicalPartitionType().getFieldCount() == 0) {
+ LOG.info(
+ "Skip reassigning row IDs for table {} because it is not
partitioned.",
+ table.name());
+ return Result.skipped(latest.id(), nextRowId, "table is not
partitioned");
+ }
+
+ ManifestFile manifestFile =
table.store().manifestFileFactory().create();
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ List<ManifestFileMeta> manifestMetas =
manifestList.readDataManifests(latest);
+ AssignmentPlan assignment = planAssignment(manifestMetas,
manifestFile, nextRowId);
+ if (!assignment.hasCurrentFiles) {
+ return Result.skipped(
+ latest.id(),
+ nextRowId,
+ partitionFilterEnabled()
+ ? "partition filter matches no current files"
+ : "table has no current files");
+ }
+ if (assignment.reassignedFileCount == 0) {
+ LOG.info(
+ "Skip reassigning row IDs for table {} because partition
row IDs are already contiguous.",
+ table.name());
+ return Result.skipped(
+ latest.id(), nextRowId, "partition row IDs are already
contiguous");
+ }
+
+ Pair<String, Long> baseManifestList =
+ writeBaseManifestList(
+ manifestMetas, assignment.rewrittenManifestMetas,
manifestList);
+ Pair<String, Long> deltaManifestList =
manifestList.write(Collections.emptyList());
+
+ RewrittenIndexManifest rewrittenIndexManifest =
rewriteIndexManifest(latest, assignment);
+
+ try (FileStoreCommitImpl commit =
+ (FileStoreCommitImpl) table.store().newCommit(commitUser,
table)) {
+ boolean success =
+ commit.replaceManifestList(
+ latest,
+ latest.totalRecordCount(),
+ baseManifestList,
+ deltaManifestList,
+ rewrittenIndexManifest.indexManifest,
+ assignment.nextRowId);
+ if (!success) {
+ throw new RuntimeException(
+ "Failed to reassign row IDs because a newer snapshot
has been committed.");
+ }
+ }
+
+ LOG.info(
+ "Reassigned row IDs for table {} from {} to {}, partitions={},
files={}, rows={}.",
+ table.name(),
+ nextRowId,
+ assignment.nextRowId,
+ assignment.rowIdMappings.size(),
+ assignment.reassignedFileCount,
+ assignment.logicalRowCount);
+ return new Result(
+ latest.id(),
+ latest.id() + 1,
+ assignment.reassignedFileCount,
+ assignment.logicalRowCount,
+ rewrittenIndexManifest.indexFileCount,
+ nextRowId,
+ assignment.nextRowId);
+ }
+
+ private AssignmentPlan planAssignment(
+ List<ManifestFileMeta> manifestMetas, ManifestFile manifestFile,
long firstRowId) {
+ List<List<ManifestFileMeta>> manifestGroups =
manifestGroupsByPartition(manifestMetas);
+ Map<String, List<ManifestFileMeta>> rewrittenManifestMetas = new
HashMap<>();
+ Map<BinaryRow, RowRangeMappingIndex> rowIdMappings = new
LinkedHashMap<>();
+ long nextRowId = firstRowId;
+ long logicalRowCount = 0;
+ long reassignedFileCount = 0;
+ boolean hasCurrentFiles = false;
+
+ for (List<ManifestFileMeta> manifestGroup : manifestGroups) {
+ if (skipManifestGroupByPartitionFilter(manifestGroup)) {
+ continue;
+ }
+
+ CurrentManifest currentManifest = currentManifest(manifestGroup,
manifestFile);
+ List<ManifestEntry> currentEntries = currentManifest.entries();
+ if (currentEntries.isEmpty()) {
+ continue;
+ }
+ hasCurrentFiles = true;
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition =
+ entriesByPartition(currentEntries);
+ Set<BinaryRow> partitionsToReassign =
partitionsToReassign(entriesByPartition);
+ if (partitionsToReassign.isEmpty()) {
+ continue;
+ }
+
+ Assignment groupAssignment =
+ assign(entriesByPartition, partitionsToReassign,
nextRowId);
+ nextRowId = groupAssignment.nextRowId;
+ logicalRowCount += groupAssignment.logicalRowCount;
+ reassignedFileCount += groupAssignment.reassignedFileCount;
+ for (Map.Entry<BinaryRow, RowRangeMappingIndex> mapping :
+ groupAssignment.rowIdMappings.entrySet()) {
+ RowRangeMappingIndex previous =
+ rowIdMappings.put(mapping.getKey(),
mapping.getValue());
+ checkState(
+ previous == null,
+ "Partition %s appears in multiple manifest groups.",
+
table.store().pathFactory().getPartitionString(mapping.getKey()));
+ }
+
+ Map<String, List<ManifestFileMeta>> groupRewrittenManifestMetas =
+ writeManifestReplacements(
+ currentManifest, groupAssignment,
partitionsToReassign, manifestFile);
+ for (Map.Entry<String, List<ManifestFileMeta>> rewritten :
+ groupRewrittenManifestMetas.entrySet()) {
+ List<ManifestFileMeta> previous =
+ rewrittenManifestMetas.put(rewritten.getKey(),
rewritten.getValue());
+ checkState(
+ previous == null,
+ "Manifest file %s appears in multiple manifest
groups.",
+ rewritten.getKey());
+ }
+ }
+
+ return new AssignmentPlan(
+ rewrittenManifestMetas,
+ rowIdMappings,
+ nextRowId,
+ logicalRowCount,
+ reassignedFileCount,
+ hasCurrentFiles);
+ }
+
+ private List<List<ManifestFileMeta>> manifestGroupsByPartition(
+ List<ManifestFileMeta> manifestMetas) {
+ List<ManifestFileMeta> nonEmptyManifestMetas = new ArrayList<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ if (manifestMeta.numAddedFiles() + manifestMeta.numDeletedFiles()
> 0) {
+ nonEmptyManifestMetas.add(manifestMeta);
+ }
+ }
+ if (nonEmptyManifestMetas.size() <= 1) {
+ return nonEmptyManifestMetas.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(nonEmptyManifestMetas);
+ }
+
+ int partitionFieldCount =
table.schema().logicalPartitionType().getFieldCount();
+ for (ManifestFileMeta manifestMeta : nonEmptyManifestMetas) {
+ if (!containsPartitionStats(manifestMeta, partitionFieldCount)) {
+ return Collections.singletonList(nonEmptyManifestMetas);
+ }
+ }
+
+ RecordComparator partitionComparator = partitionComparator();
+ List<PartitionManifestRange> manifestRanges = new
ArrayList<>(nonEmptyManifestMetas.size());
+ for (int i = 0; i < nonEmptyManifestMetas.size(); i++) {
+ ManifestFileMeta manifestMeta = nonEmptyManifestMetas.get(i);
+ manifestRanges.add(
+ new PartitionManifestRange(
+ manifestMeta,
+ manifestMeta.partitionStats().minValues(),
+ manifestMeta.partitionStats().maxValues(),
+ i));
+ }
+ Collections.sort(
+ manifestRanges,
+ (left, right) -> {
+ int result =
partitionComparator.compare(left.minPartition, right.minPartition);
+ if (result != 0) {
+ return result;
+ }
+ return partitionComparator.compare(left.maxPartition,
right.maxPartition);
+ });
+
+ List<List<PartitionManifestRange>> groupedManifestRanges = new
ArrayList<>();
+ List<PartitionManifestRange> currentGroup = new ArrayList<>();
+ currentGroup.add(manifestRanges.get(0));
+ BinaryRow currentMaxPartition = manifestRanges.get(0).maxPartition;
+ for (int i = 1; i < manifestRanges.size(); i++) {
+ PartitionManifestRange current = manifestRanges.get(i);
+ if (partitionComparator.compare(current.minPartition,
currentMaxPartition) <= 0) {
+ currentGroup.add(current);
+ if (partitionComparator.compare(current.maxPartition,
currentMaxPartition) > 0) {
+ currentMaxPartition = current.maxPartition;
+ }
+ } else {
+ groupedManifestRanges.add(currentGroup);
+ currentGroup = new ArrayList<>();
+ currentGroup.add(current);
+ currentMaxPartition = current.maxPartition;
+ }
+ }
+ groupedManifestRanges.add(currentGroup);
+
+ List<List<ManifestFileMeta>> groups = new ArrayList<>();
+ for (List<PartitionManifestRange> group : groupedManifestRanges) {
+ Collections.sort(group, Comparator.comparingInt(left ->
left.originalIndex));
+ List<ManifestFileMeta> manifestGroup = new
ArrayList<>(group.size());
+ for (PartitionManifestRange range : group) {
+ manifestGroup.add(range.manifest);
+ }
+ groups.add(manifestGroup);
+ }
+ return groups;
+ }
+
+ private boolean skipManifestGroupByPartitionFilter(List<ManifestFileMeta>
manifestGroup) {
+ if (!partitionFilterEnabled()) {
+ return false;
+ }
+
+ int partitionFieldCount =
table.schema().logicalPartitionType().getFieldCount();
+ for (ManifestFileMeta manifestMeta : manifestGroup) {
+ if (!containsPartitionStats(manifestMeta, partitionFieldCount)) {
+ return false;
+ }
+
+ SimpleStats partitionStats = manifestMeta.partitionStats();
+ if (partitionPredicate.test(
+ manifestMeta.numAddedFiles() +
manifestMeta.numDeletedFiles(),
+ partitionStats.minValues(),
+ partitionStats.maxValues(),
+ partitionStats.nullCounts())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean containsPartitionStats(ManifestFileMeta manifestMeta, int
partitionFieldCount) {
+ SimpleStats partitionStats = manifestMeta.partitionStats();
+ return partitionStats != null
+ && partitionStats.minValues().getFieldCount() ==
partitionFieldCount
+ && partitionStats.maxValues().getFieldCount() ==
partitionFieldCount
+ && partitionStats.nullCounts().size() == partitionFieldCount;
+ }
+
+ private CurrentManifest currentManifest(
+ List<ManifestFileMeta> manifestMetas, ManifestFile manifestFile) {
+ Set<FileEntry.Identifier> deletedIdentifiers =
+ deletedIdentifiers(manifestFile, manifestMetas);
+
+ List<SourcedManifestEntry> currentEntries = new ArrayList<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ if (manifestMeta.numAddedFiles() <= 0) {
+ continue;
+ }
+ List<ManifestEntry> entries =
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize());
+ for (ManifestEntry entry : entries) {
+ if (entry.kind() == FileKind.ADD
+ && partitionIncluded(entry.partition())
+ && !deletedIdentifiers.contains(entry.identifier())) {
+ currentEntries.add(new SourcedManifestEntry(manifestMeta,
entry));
+ }
+ }
+ }
+ return new CurrentManifest(manifestMetas, currentEntries);
+ }
+
+ private Set<FileEntry.Identifier> deletedIdentifiers(
+ ManifestFile manifestFile, List<ManifestFileMeta> manifestMetas) {
+ Set<FileEntry.Identifier> deletedIdentifiers = new HashSet<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ if (manifestMeta.numDeletedFiles() <= 0) {
+ continue;
+ }
+ List<ManifestEntry> entries =
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize());
+ for (ManifestEntry entry : entries) {
+ if (entry.kind() == FileKind.DELETE &&
partitionIncluded(entry.partition())) {
+ deletedIdentifiers.add(entry.identifier());
+ }
+ }
+ }
+ return deletedIdentifiers;
+ }
+
+ private boolean partitionIncluded(BinaryRow partition) {
+ return !partitionFilterEnabled() || partitionPredicate.test(partition);
+ }
+
+ private boolean partitionFilterEnabled() {
+ return partitionPredicate != null;
+ }
+
+ private Pair<String, Long> writeBaseManifestList(
+ List<ManifestFileMeta> manifestMetas,
+ Map<String, List<ManifestFileMeta>> rewrittenManifestMetas,
+ ManifestList manifestList) {
+ List<ManifestFileMeta> baseManifestMetas = new ArrayList<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ List<ManifestFileMeta> replacement =
+ rewrittenManifestMetas.get(manifestMeta.fileName());
+ if (replacement == null) {
+ baseManifestMetas.add(manifestMeta);
+ } else {
+ baseManifestMetas.addAll(replacement);
+ }
+ }
+ return manifestList.write(baseManifestMetas);
+ }
+
+ private Map<String, List<ManifestFileMeta>> writeManifestReplacements(
+ CurrentManifest currentManifest,
+ Assignment assignment,
+ Set<BinaryRow> partitionsToReassign,
+ ManifestFile manifestFile) {
+ Set<String> manifestsToRewrite =
+ manifestsToRewrite(currentManifest.currentEntries,
partitionsToReassign);
+ Map<FileEntry.Identifier, ManifestEntry> reassignedEntries =
+ entriesByIdentifier(assignment.entries);
+
+ Map<String, List<ManifestFileMeta>> rewrittenManifestMetas = new
HashMap<>();
+ for (ManifestFileMeta manifestMeta : currentManifest.manifestMetas) {
+ if (!manifestsToRewrite.contains(manifestMeta.fileName())) {
+ continue;
+ }
+
+ List<ManifestEntry> rewrittenEntries = new ArrayList<>();
+ List<ManifestEntry> entries =
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize());
+ for (ManifestEntry entry : entries) {
+ if (entry.kind() == FileKind.ADD) {
+ ManifestEntry reassignedEntry =
reassignedEntries.get(entry.identifier());
+ if (reassignedEntry != null) {
+ entry = reassignedEntry;
+ }
+ }
+ rewrittenEntries.add(entry);
+ }
+ rewrittenManifestMetas.put(
+ manifestMeta.fileName(),
manifestFile.write(rewrittenEntries));
+ }
+ return rewrittenManifestMetas;
+ }
+
+ private Set<String> manifestsToRewrite(
+ List<SourcedManifestEntry> currentEntries, Set<BinaryRow>
partitionsToReassign) {
+ Set<String> manifestsToRewrite = new HashSet<>();
+ for (SourcedManifestEntry currentEntry : currentEntries) {
+ if (partitionsToReassign.contains(currentEntry.entry.partition()))
{
+ manifestsToRewrite.add(currentEntry.manifest.fileName());
+ }
+ }
+ return manifestsToRewrite;
+ }
+
+ private Map<FileEntry.Identifier, ManifestEntry> entriesByIdentifier(
+ List<ManifestEntry> entries) {
+ Map<FileEntry.Identifier, ManifestEntry> result = new HashMap<>();
+ for (ManifestEntry entry : entries) {
+ ManifestEntry previous = result.put(entry.identifier(), entry);
+ checkState(previous == null, "Duplicate current manifest entry for
file %s.", entry);
+ }
+ return result;
+ }
+
+ private Map<BinaryRow, List<ManifestEntry>>
entriesByPartition(List<ManifestEntry> entries) {
+ List<ManifestEntry> sorted = new ArrayList<>(entries);
+ Collections.sort(sorted, entryComparator());
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
LinkedHashMap<>();
+ for (ManifestEntry entry : sorted) {
+ List<String> writeCols = entry.file().writeCols();
+ checkState(
+ writeCols == null ||
!writeCols.contains(SpecialFields.ROW_ID.name()),
+ "Cannot reassign row IDs for file '%s' because it
physically stores the row-id field.",
+ entry.file().fileName());
+ checkState(
+ entry.file().firstRowId() != null,
+ "File '%s' in table '%s' does not have first row id.",
+ entry.file().fileName(),
+ table.name());
+ entriesByPartition
+ .computeIfAbsent(entry.partition(), k -> new ArrayList<>())
+ .add(entry);
+ }
+ return entriesByPartition;
+ }
+
+ private Set<BinaryRow> partitionsToReassign(
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition) {
+ Set<BinaryRow> partitionsToReassign = new HashSet<>();
+ for (Map.Entry<BinaryRow, List<ManifestEntry>> entry :
entriesByPartition.entrySet()) {
+ if (!partitionRowIdsAreContiguous(entry.getValue())) {
+ partitionsToReassign.add(entry.getKey());
+ }
+ }
+ return partitionsToReassign;
+ }
+
+ private boolean partitionRowIdsAreContiguous(List<ManifestEntry> entries) {
+ List<Range> logicalRanges = logicalRanges(entries);
+ if (logicalRanges.size() <= 1) {
+ return true;
+ }
+
+ Collections.sort(
+ logicalRanges,
+ (left, right) -> {
+ int result = Long.compare(left.from, right.from);
+ return result == 0 ? Long.compare(left.to, right.to) :
result;
+ });
+ long previousEnd = logicalRanges.get(0).to;
+ for (int i = 1; i < logicalRanges.size(); i++) {
+ Range current = logicalRanges.get(i);
+ if (current.from != previousEnd + 1) {
+ return false;
+ }
+ previousEnd = current.to;
+ }
+ return true;
+ }
+
+ private List<Range> logicalRanges(List<ManifestEntry> entries) {
+ RangeHelper<ManifestEntry> rangeHelper =
+ new RangeHelper<>(entry -> entry.file().nonNullRowIdRange());
+ List<List<ManifestEntry>> groups =
rangeHelper.mergeOverlappingRanges(entries);
+ List<Range> logicalRanges = new ArrayList<>(groups.size());
+ for (List<ManifestEntry> group : groups) {
+ logicalRanges.add(oldLogicalRange(group));
+ }
+ return logicalRanges;
+ }
+
+ private Assignment assign(
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition,
+ Set<BinaryRow> partitionsToReassign,
+ long firstRowId) {
+ List<ManifestEntry> entries = new ArrayList<>();
+ Map<BinaryRow, RowRangeMappingIndex> rowIdMappings = new
LinkedHashMap<>();
+ long nextRowId = firstRowId;
+ long logicalRowCount = 0;
+ long reassignedFileCount = 0;
+ for (Map.Entry<BinaryRow, List<ManifestEntry>> entry :
entriesByPartition.entrySet()) {
+ if (partitionsToReassign.contains(entry.getKey())) {
+ long partitionFirstRowId = nextRowId;
+ PartitionAssignment partitionAssignment =
+ assignPartition(entry.getValue(), nextRowId);
+ entries.addAll(partitionAssignment.entries);
+ rowIdMappings.put(entry.getKey(),
partitionAssignment.rowIdMappings);
+ nextRowId = partitionAssignment.nextRowId;
+ logicalRowCount += nextRowId - partitionFirstRowId;
+ reassignedFileCount += partitionAssignment.entries.size();
+ }
+ }
+
+ return new Assignment(
+ entries, rowIdMappings, nextRowId, logicalRowCount,
reassignedFileCount);
+ }
+
+ private PartitionAssignment assignPartition(List<ManifestEntry> entries,
long firstRowId) {
+ RangeHelper<ManifestEntry> rangeHelper =
+ new RangeHelper<>(entry -> entry.file().nonNullRowIdRange());
+ List<List<ManifestEntry>> groups =
rangeHelper.mergeOverlappingRanges(entries);
+
+ List<ManifestEntry> reassigned = new ArrayList<>(entries.size());
+ List<RowRangeMappingIndex.Mapping> mappings = new ArrayList<>();
+ long nextRowId = firstRowId;
+
+ for (List<ManifestEntry> group : groups) {
+ Collections.sort(group, entryComparatorWithoutPartition());
+ Range oldLogicalRange = oldLogicalRange(group);
+ mappings.add(
+ RowRangeMappingIndex.mapping(
+ oldLogicalRange.from, oldLogicalRange.to,
nextRowId));
+
+ for (ManifestEntry entry : group) {
+ long oldFirstRowId = entry.file().nonNullFirstRowId();
+ long newFirstRowId = nextRowId + oldFirstRowId -
oldLogicalRange.from;
+ reassigned.add(entry.assignFirstRowId(newFirstRowId));
+ }
+
+ nextRowId += oldLogicalRange.count();
+ }
+
+ return new PartitionAssignment(
+ reassigned, RowRangeMappingIndex.create(mappings), nextRowId);
+ }
+
+ private Range oldLogicalRange(List<ManifestEntry> group) {
+ List<ManifestEntry> dataFiles = new ArrayList<>();
+ for (ManifestEntry entry : group) {
+ if (!isSpecialFile(entry)) {
+ dataFiles.add(entry);
+ }
+ }
+
+ Range logicalRange;
+ if (dataFiles.isEmpty()) {
+ logicalRange = spanningRange(group);
+ } else {
+ logicalRange = dataFiles.get(0).file().nonNullRowIdRange();
+ for (ManifestEntry dataFile : dataFiles) {
+ Range current = dataFile.file().nonNullRowIdRange();
+ checkState(
+ logicalRange.from == current.from && logicalRange.to
== current.to,
+ "Data files in one overlapping row-id group must have
the same row-id range, but found %s and %s.",
+ logicalRange,
+ current);
+ }
+ }
+
+ for (ManifestEntry entry : group) {
+ Range range = entry.file().nonNullRowIdRange();
+ checkState(
+ range.from >= logicalRange.from && range.to <=
logicalRange.to,
+ "File '%s' row-id range %s is outside logical row-id range
%s.",
+ entry.file().fileName(),
+ range,
+ logicalRange);
+ }
+ return logicalRange;
+ }
+
+ private Range spanningRange(List<ManifestEntry> group) {
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (ManifestEntry entry : group) {
+ Range range = entry.file().nonNullRowIdRange();
+ min = Math.min(min, range.from);
+ max = Math.max(max, range.to);
+ }
+ return new Range(min, max);
+ }
+
+ private RewrittenIndexManifest rewriteIndexManifest(
+ Snapshot latest, AssignmentPlan assignment) {
+ if (latest.indexManifest() == null) {
+ return new RewrittenIndexManifest(null, 0);
+ }
+
+ IndexManifestFile indexManifestFile =
table.store().indexManifestFileFactory().create();
+ List<IndexManifestEntry> indexEntries =
indexManifestFile.read(latest.indexManifest());
+ if (indexEntries.isEmpty()) {
+ return new RewrittenIndexManifest(null, 0);
+ }
+
+ List<IndexManifestEntry> rewritten = new
ArrayList<>(indexEntries.size());
+ long globalIndexFileCount = 0;
+ for (IndexManifestEntry entry : indexEntries) {
+ checkState(
+ entry.kind() == FileKind.ADD,
+ "Index manifest '%s' contains non-current entry %s.",
+ latest.indexManifest(),
+ entry);
+
+ IndexFileMeta indexFile = entry.indexFile();
+ GlobalIndexMeta globalIndex = indexFile.globalIndexMeta();
+ RowRangeMappingIndex mappingIndex =
assignment.rowIdMappings.get(entry.partition());
+ if (globalIndex == null || mappingIndex == null) {
+ rewritten.add(entry);
+ continue;
+ }
+
+ globalIndexFileCount++;
+ Range newRange = mappingIndex.map(globalIndex.rowRange());
+ GlobalIndexMeta newGlobalIndex =
+ new GlobalIndexMeta(
+ newRange.from,
+ newRange.to,
+ globalIndex.indexFieldId(),
+ globalIndex.extraFieldIds(),
+ globalIndex.indexMeta());
+ IndexFileMeta newIndexFile =
+ new IndexFileMeta(
+ indexFile.indexType(),
+ indexFile.fileName(),
+ indexFile.fileSize(),
+ indexFile.rowCount(),
+ indexFile.dvRanges(),
+ indexFile.externalPath(),
+ newGlobalIndex);
+ rewritten.add(
+ new IndexManifestEntry(
+ entry.kind(), entry.partition(), entry.bucket(),
newIndexFile));
+ }
+
+ return new RewrittenIndexManifest(
+ indexManifestFile.writeWithoutRolling(rewritten),
globalIndexFileCount);
+ }
+
+ private Comparator<ManifestEntry> entryComparator() {
+ RecordComparator partitionComparator = partitionComparator();
+ Comparator<ManifestEntry> withoutPartition =
entryComparatorWithoutPartition();
+ return (left, right) -> {
+ int partitionCompare =
partitionComparator.compare(left.partition(), right.partition());
+ if (partitionCompare != 0) {
+ return partitionCompare;
+ }
+ return withoutPartition.compare(left, right);
+ };
+ }
+
+ private RecordComparator partitionComparator() {
+ return CodeGenUtils.newRecordComparator(
+ table.schema().logicalPartitionType().getFieldTypes());
+ }
+
+ private Comparator<ManifestEntry> entryComparatorWithoutPartition() {
+ return (left, right) -> {
+ int result =
+ Long.compare(left.file().nonNullFirstRowId(),
right.file().nonNullFirstRowId());
+ if (result != 0) {
+ return result;
+ }
+ result = Integer.compare(fileOrder(left), fileOrder(right));
+ if (result != 0) {
+ return result;
+ }
+ result =
+ Long.compare(right.file().maxSequenceNumber(),
left.file().maxSequenceNumber());
+ if (result != 0) {
+ return result;
+ }
+ return left.file().fileName().compareTo(right.file().fileName());
+ };
+ }
+
+ private int fileOrder(ManifestEntry entry) {
+ if (isBlobFile(entry.file().fileName())) {
+ return 1;
+ }
+ if (isVectorStoreFile(entry.file().fileName())) {
+ return 2;
+ }
+ return 0;
+ }
+
+ private boolean isSpecialFile(ManifestEntry entry) {
+ return isBlobFile(entry.file().fileName()) ||
isVectorStoreFile(entry.file().fileName());
+ }
+
+ /** Result of row-id reassignment. */
+ public static class Result {
+ public final long previousSnapshotId;
+ public final long newSnapshotId;
+ public final long fileCount;
+ public final long rowCount;
+ public final long indexFileCount;
+ public final long firstAssignedRowId;
+ public final long nextRowId;
+ public final boolean reassigned;
+ @Nullable public final String skipReason;
+
+ public Result(
+ long previousSnapshotId,
+ long newSnapshotId,
+ long fileCount,
+ long rowCount,
+ long indexFileCount,
+ long firstAssignedRowId,
+ long nextRowId) {
+ this(
+ previousSnapshotId,
+ newSnapshotId,
+ fileCount,
+ rowCount,
+ indexFileCount,
+ firstAssignedRowId,
+ nextRowId,
+ true,
+ null);
+ }
+
+ public Result(
+ long previousSnapshotId,
+ long newSnapshotId,
+ long fileCount,
+ long rowCount,
+ long indexFileCount,
+ long firstAssignedRowId,
+ long nextRowId,
+ boolean reassigned) {
+ this(
+ previousSnapshotId,
+ newSnapshotId,
+ fileCount,
+ rowCount,
+ indexFileCount,
+ firstAssignedRowId,
+ nextRowId,
+ reassigned,
+ null);
+ }
+
+ public Result(
+ long previousSnapshotId,
+ long newSnapshotId,
+ long fileCount,
+ long rowCount,
+ long indexFileCount,
+ long firstAssignedRowId,
+ long nextRowId,
+ boolean reassigned,
+ @Nullable String skipReason) {
+ this.previousSnapshotId = previousSnapshotId;
+ this.newSnapshotId = newSnapshotId;
+ this.fileCount = fileCount;
+ this.rowCount = rowCount;
+ this.indexFileCount = indexFileCount;
+ this.firstAssignedRowId = firstAssignedRowId;
+ this.nextRowId = nextRowId;
+ this.reassigned = reassigned;
+ this.skipReason = skipReason;
+ }
+
+ private static Result skipped(long snapshotId, long nextRowId, String
reason) {
+ return new Result(snapshotId, snapshotId, 0, 0, 0, nextRowId,
nextRowId, false, reason);
+ }
+ }
+
+ private static class RewrittenIndexManifest {
+ @Nullable private final String indexManifest;
+ private final long indexFileCount;
+
+ private RewrittenIndexManifest(@Nullable String indexManifest, long
indexFileCount) {
+ this.indexManifest = indexManifest;
+ this.indexFileCount = indexFileCount;
+ }
+ }
+
+ private static class Assignment {
+ private final List<ManifestEntry> entries;
+ private final Map<BinaryRow, RowRangeMappingIndex> rowIdMappings;
+ private final long nextRowId;
+ private final long logicalRowCount;
+ private final long reassignedFileCount;
+
+ private Assignment(
+ List<ManifestEntry> entries,
+ Map<BinaryRow, RowRangeMappingIndex> rowIdMappings,
+ long nextRowId,
+ long logicalRowCount,
+ long reassignedFileCount) {
+ this.entries = entries;
+ this.rowIdMappings = rowIdMappings;
+ this.nextRowId = nextRowId;
+ this.logicalRowCount = logicalRowCount;
+ this.reassignedFileCount = reassignedFileCount;
+ }
+ }
+
+ private static class AssignmentPlan {
+ private final Map<String, List<ManifestFileMeta>>
rewrittenManifestMetas;
+ private final Map<BinaryRow, RowRangeMappingIndex> rowIdMappings;
+ private final long nextRowId;
+ private final long logicalRowCount;
+ private final long reassignedFileCount;
+ private final boolean hasCurrentFiles;
+
+ private AssignmentPlan(
+ Map<String, List<ManifestFileMeta>> rewrittenManifestMetas,
+ Map<BinaryRow, RowRangeMappingIndex> rowIdMappings,
+ long nextRowId,
+ long logicalRowCount,
+ long reassignedFileCount,
+ boolean hasCurrentFiles) {
+ this.rewrittenManifestMetas = rewrittenManifestMetas;
+ this.rowIdMappings = rowIdMappings;
+ this.nextRowId = nextRowId;
+ this.logicalRowCount = logicalRowCount;
+ this.reassignedFileCount = reassignedFileCount;
+ this.hasCurrentFiles = hasCurrentFiles;
+ }
+ }
+
+ private static class CurrentManifest {
+ private final List<ManifestFileMeta> manifestMetas;
+ private final List<SourcedManifestEntry> currentEntries;
+
+ private CurrentManifest(
+ List<ManifestFileMeta> manifestMetas,
List<SourcedManifestEntry> currentEntries) {
+ this.manifestMetas = manifestMetas;
+ this.currentEntries = currentEntries;
+ }
+
+ private List<ManifestEntry> entries() {
+ List<ManifestEntry> entries = new
ArrayList<>(currentEntries.size());
+ for (SourcedManifestEntry currentEntry : currentEntries) {
+ entries.add(currentEntry.entry);
+ }
+ return entries;
+ }
+ }
+
+ private static class SourcedManifestEntry {
+ private final ManifestFileMeta manifest;
+ private final ManifestEntry entry;
+
+ private SourcedManifestEntry(ManifestFileMeta manifest, ManifestEntry
entry) {
+ this.manifest = manifest;
+ this.entry = entry;
+ }
+ }
+
+ private static class PartitionManifestRange {
+ private final ManifestFileMeta manifest;
+ private final BinaryRow minPartition;
+ private final BinaryRow maxPartition;
+ private final int originalIndex;
+
+ private PartitionManifestRange(
+ ManifestFileMeta manifest,
+ BinaryRow minPartition,
+ BinaryRow maxPartition,
+ int originalIndex) {
+ this.manifest = manifest;
+ this.minPartition = minPartition;
+ this.maxPartition = maxPartition;
+ this.originalIndex = originalIndex;
+ }
+ }
+
+ private static class PartitionAssignment {
+ private final List<ManifestEntry> entries;
+ private final RowRangeMappingIndex rowIdMappings;
+ private final long nextRowId;
+
+ private PartitionAssignment(
+ List<ManifestEntry> entries, RowRangeMappingIndex
rowIdMappings, long nextRowId) {
+ this.entries = entries;
+ this.rowIdMappings = rowIdMappings;
+ this.nextRowId = nextRowId;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
new file mode 100644
index 0000000000..d234c6f736
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Index for row-range mappings. */
+final class RowRangeMappingIndex {
+
+ private final List<Mapping> mappings;
+ private final long[] oldEnds;
+
+ private RowRangeMappingIndex(List<Mapping> mappings) {
+ this.mappings = mappings;
+ this.oldEnds = new long[mappings.size()];
+ for (int i = 0; i < mappings.size(); i++) {
+ Mapping mapping = mappings.get(i);
+ oldEnds[i] = mapping.oldEnd;
+ }
+ }
+
+ static RowRangeMappingIndex create(List<Mapping> mappings) {
+ checkArgument(mappings != null, "Row range mappings cannot be null.");
+ checkArgument(!mappings.isEmpty(), "Row range mappings cannot be
empty.");
+
+ List<Mapping> sorted = new ArrayList<>(mappings);
+ Collections.sort(sorted, Comparator.comparingLong(mapping ->
mapping.oldStart));
+ Mapping previous = null;
+ for (Mapping mapping : sorted) {
+ checkArgument(
+ mapping.oldStart <= mapping.oldEnd,
+ "Invalid old row range [%s, %s].",
+ mapping.oldStart,
+ mapping.oldEnd);
+ if (previous != null) {
+ checkArgument(
+ previous.oldEnd < mapping.oldStart,
+ "Old row range mappings cannot overlap.");
+ }
+ previous = mapping;
+ }
+ return new RowRangeMappingIndex(Collections.unmodifiableList(sorted));
+ }
+
+ static Mapping mapping(long oldStart, long oldEnd, long newStart) {
+ return new Mapping(oldStart, oldEnd, newStart);
+ }
+
+ Range map(Range oldRange) {
+ checkArgument(oldRange != null, "Old row range cannot be null.");
+ checkArgument(oldRange.from <= oldRange.to, "Invalid old row range
%s.", oldRange);
+
+ long cursor = oldRange.from;
+ Long newFrom = null;
+ long newTo = Long.MIN_VALUE;
+
+ for (int i = lowerBound(oldEnds, cursor); i < mappings.size(); i++) {
+ Mapping mapping = mappings.get(i);
+ if (mapping.oldStart > cursor) {
+ break;
+ }
+
+ long segmentTo = Math.min(mapping.oldEnd, oldRange.to);
+ long segmentNewFrom = mapping.newStart + cursor - mapping.oldStart;
+ long segmentNewTo = mapping.newStart + segmentTo -
mapping.oldStart;
+
+ if (newFrom == null) {
+ newFrom = segmentNewFrom;
+ } else {
+ checkState(
+ newTo + 1 == segmentNewFrom,
+ "Global index row range %s maps to non-contiguous new
row range.",
+ oldRange);
+ }
+ newTo = segmentNewTo;
+ cursor = segmentTo + 1;
+ if (cursor > oldRange.to) {
+ break;
+ }
+ }
+
+ checkState(
+ cursor > oldRange.to && newFrom != null,
+ "Global index row range %s is not fully covered by data file
row-id mappings.",
+ oldRange);
+ return new Range(newFrom, newTo);
+ }
+
+ private static int lowerBound(long[] sorted, long target) {
+ int left = 0;
+ int right = sorted.length;
+ while (left < right) {
+ int mid = left + (right - left) / 2;
+ if (sorted[mid] < target) {
+ left = mid + 1;
+ } else {
+ right = mid;
+ }
+ }
+ return left;
+ }
+
+ static final class Mapping {
+ private final long oldStart;
+ private final long oldEnd;
+ private final long newStart;
+
+ private Mapping(long oldStart, long oldEnd, long newStart) {
+ this.oldStart = oldStart;
+ this.oldEnd = oldEnd;
+ this.newStart = newStart;
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
new file mode 100644
index 0000000000..2cacf4722e
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
@@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
+import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionRowIdReassigner}. */
+public class DataEvolutionRowIdReassignerTest extends TableTestBase {
+
+ private static final int LARGE_ENTRY_COUNT = 10_000;
+ private static final int LARGE_MANIFEST_FILE_COUNT = 20;
+ private static final int ONE_MILLION_ENTRY_COUNT = 1_000_000;
+ private static final int ONE_MILLION_MANIFEST_FILE_COUNT = 200;
+ private static final long LARGE_ENTRY_ROW_COUNT = 2L;
+ private static final long RANDOM_PARTITION_SEED = 20260519L;
+ private static final String LARGE_FILE_PREFIX = "large-partition-";
+ private static final String LARGE_FILE_SUFFIX = ".parquet";
+ private static final int LARGE_PARTITION_ID_WIDTH = 7;
+
+ @Override
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("pt", DataTypes.STRING());
+ schemaBuilder.column("id", DataTypes.INT());
+ schemaBuilder.column("payload", DataTypes.STRING());
+ schemaBuilder.partitionKeys("pt");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.GLOBAL_INDEX_ENABLED.key(), "true");
+
schemaBuilder.option(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key(),
"1");
+ return schemaBuilder.build();
+ }
+
+ private Schema unpartitionedSchema() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("id", DataTypes.INT());
+ schemaBuilder.column("payload", DataTypes.STRING());
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return schemaBuilder.build();
+ }
+
+ @Test
+ public void testReassignRowIdsByPartition() throws Exception {
+ FileStoreTable table = createTableWithInterleavedPartitions();
+
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-row-id");
+
+ assertThat(result.reassigned).isTrue();
+ assertThat(result.previousSnapshotId).isEqualTo(5L);
+ assertThat(result.newSnapshotId).isEqualTo(6L);
+ assertThat(result.firstAssignedRowId).isEqualTo(5L);
+ assertThat(result.nextRowId).isEqualTo(10L);
+ assertThat(result.fileCount).isEqualTo(5L);
+ assertThat(result.rowCount).isEqualTo(5L);
+ assertThat(result.indexFileCount).isEqualTo(0L);
+
+ Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
+ assertThat(rowIdsByPartition).hasSize(2);
+ assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L,
6L, 7L));
+ assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L,
9L));
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(10L);
+ }
+
+ @Test
+ public void testReassignMultiRowFilesByPartition() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeRows(table, "a", 0, 1);
+ writeRows(table, "b", 2, 3);
+ writeRows(table, "a", 4, 5);
+
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(6L);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-multi-row");
+
+ assertThat(result.firstAssignedRowId).isEqualTo(6L);
+ assertThat(result.nextRowId).isEqualTo(10L);
+ assertThat(result.fileCount).isEqualTo(2L);
+ assertThat(result.rowCount).isEqualTo(4L);
+ Map<String, List<Long>> rowIdsByPartition =
expandedRowIdsByPartition(table);
+ assertThat(rowIdsByPartition).hasSize(2);
+ assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(6L,
7L, 8L, 9L));
+ assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(2L,
3L));
+ }
+
+ @Test
+ public void testReassignOnlyOverlappingPartitions() throws Exception {
+ FileStoreTable table = createTableWithPartiallyOverlappedPartitions();
+ createBTreeIndex(table);
+
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-partial-row-id");
+
+ assertThat(result.firstAssignedRowId).isEqualTo(5L);
+ assertThat(result.nextRowId).isEqualTo(7L);
+ assertThat(result.fileCount).isEqualTo(2L);
+ assertThat(result.rowCount).isEqualTo(2L);
+ assertThat(result.indexFileCount).isEqualTo(2L);
+
+ Map<String, List<Long>> rowIdsByPartition =
expandedRowIdsByPartition(table);
+ assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L,
6L));
+ assertThat(rowIdsByPartition).containsEntry("pt=b/",
Collections.singletonList(3L));
+ assertThat(rowIdsByPartition).containsEntry("pt=c/", Arrays.asList(0L,
1L));
+ assertThat(globalIndexRanges(table))
+ .containsExactly(
+ new Range(0, 1),
+ new Range(0, 1),
+ new Range(3, 3),
+ new Range(5, 5),
+ new Range(6, 6));
+
+ Predicate oldPartitionPredicate =
+ new
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 1);
+ assertThat(readPayloads(table,
oldPartitionPredicate)).containsExactly("v1");
+ Predicate reassignedPartitionPredicate =
+ new
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
+ assertThat(readPayloads(table,
reassignedPartitionPredicate)).containsExactly("v4");
+ }
+
+ @Test
+ public void testReassignOnlyOuterPartitionWhenInnerRangeCanStay() throws
Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeOneRow(table, "a", 0);
+ writeOneRow(table, "b", 1);
+ writeOneRow(table, "c", 2);
+ writeOneRow(table, "c", 3);
+ writeOneRow(table, "d", 4);
+ writeOneRow(table, "d", 5);
+ writeOneRow(table, "b", 6);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-only-b-row-id");
+
+ assertThat(result.firstAssignedRowId).isEqualTo(7L);
+ assertThat(result.nextRowId).isEqualTo(9L);
+ assertThat(result.fileCount).isEqualTo(2L);
+ assertThat(result.rowCount).isEqualTo(2L);
+
+ assertThat(rowIdsByPartition(table))
+ .containsEntry("pt=a/", Collections.singletonList(0L))
+ .containsEntry("pt=b/", Arrays.asList(7L, 8L))
+ .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+ .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+ }
+
+ @Test
+ public void testPartitionFilterOnlyReassignsSelectedPartitions() throws
Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeOneRow(table, "a", 0);
+ writeOneRow(table, "b", 1);
+ writeOneRow(table, "c", 2);
+ writeOneRow(table, "c", 3);
+ writeOneRow(table, "d", 4);
+ writeOneRow(table, "d", 5);
+ writeOneRow(table, "b", 6);
+
+ DataEvolutionRowIdReassigner.Result cResult =
+ new DataEvolutionRowIdReassigner(table,
partitionPredicate(table, "c"))
+ .reassign("test-filter-c-row-id");
+
+ assertThat(cResult.reassigned).isFalse();
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(7L);
+ assertThat(rowIdsByPartition(table))
+ .containsEntry("pt=a/", Collections.singletonList(0L))
+ .containsEntry("pt=b/", Arrays.asList(1L, 6L))
+ .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+ .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+
+ DataEvolutionRowIdReassigner.Result bResult =
+ new DataEvolutionRowIdReassigner(table,
partitionPredicate(table, "b"))
+ .reassign("test-filter-b-row-id");
+
+ assertThat(bResult.reassigned).isTrue();
+ assertThat(bResult.firstAssignedRowId).isEqualTo(7L);
+ assertThat(bResult.nextRowId).isEqualTo(9L);
+ assertThat(bResult.fileCount).isEqualTo(2L);
+ assertThat(bResult.rowCount).isEqualTo(2L);
+ assertThat(rowIdsByPartition(table))
+ .containsEntry("pt=a/", Collections.singletonList(0L))
+ .containsEntry("pt=b/", Arrays.asList(7L, 8L))
+ .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+ .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+ }
+
+ @Test
+ public void testReassignReusesUnaffectedManifestFiles() throws Exception {
+ FileStoreTable table = createTableWithPartiallyOverlappedPartitions();
+ Map<String, Set<String>> partitionsByManifest =
currentPartitionsByManifest(table);
+ List<String> unaffectedManifests = new ArrayList<>();
+ List<String> affectedManifests = new ArrayList<>();
+ for (Map.Entry<String, Set<String>> entry :
partitionsByManifest.entrySet()) {
+ if (!entry.getValue().contains("pt=a/")) {
+ unaffectedManifests.add(entry.getKey());
+ }
+ if (entry.getValue().contains("pt=a/")) {
+ affectedManifests.add(entry.getKey());
+ }
+ }
+ assertThat(unaffectedManifests).isNotEmpty();
+ assertThat(affectedManifests).isNotEmpty();
+
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-reuse-manifest");
+
+ List<String> afterManifestFiles = dataManifestFileNames(table);
+ assertThat(afterManifestFiles).containsAll(unaffectedManifests);
+
assertThat(afterManifestFiles).doesNotContainAnyElementsOf(affectedManifests);
+ }
+
+ @Test
+ public void testSkipWhenPartitionRowIdsAreContiguous() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeRows(table, "a", 0, 1);
+ writeRows(table, "a", 2);
+ writeRows(table, "b", 3, 4);
+
+
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(3L);
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-skip-row-id");
+
+ assertThat(result.reassigned).isFalse();
+ assertThat(result.skipReason).isEqualTo("partition row IDs are already
contiguous");
+ assertThat(result.previousSnapshotId).isEqualTo(3L);
+ assertThat(result.newSnapshotId).isEqualTo(3L);
+ assertThat(result.firstAssignedRowId).isEqualTo(5L);
+ assertThat(result.nextRowId).isEqualTo(5L);
+ assertThat(result.fileCount).isEqualTo(0L);
+ assertThat(result.rowCount).isEqualTo(0L);
+ assertThat(result.indexFileCount).isEqualTo(0L);
+
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(3L);
+ assertThat(expandedRowIdsByPartition(table))
+ .containsEntry("pt=a/", Arrays.asList(0L, 1L, 2L))
+ .containsEntry("pt=b/", Arrays.asList(3L, 4L));
+ }
+
+ @Test
+ public void testSkipUnpartitionedTable() throws Exception {
+ Identifier identifier = Identifier.create(database,
"unpartitioned_table");
+ catalog.createTable(identifier, unpartitionedSchema(), false);
+ FileStoreTable table = getTable(identifier);
+ writeUnpartitionedRows(table, 0, 1, 2);
+
+
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(1L);
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(3L);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-skip-unpartitioned-row-id");
+
+ assertThat(result.reassigned).isFalse();
+ assertThat(result.skipReason).isEqualTo("table is not partitioned");
+ assertThat(result.previousSnapshotId).isEqualTo(1L);
+ assertThat(result.newSnapshotId).isEqualTo(1L);
+ assertThat(result.firstAssignedRowId).isEqualTo(3L);
+ assertThat(result.nextRowId).isEqualTo(3L);
+ assertThat(result.fileCount).isEqualTo(0L);
+ assertThat(result.rowCount).isEqualTo(0L);
+ assertThat(result.indexFileCount).isEqualTo(0L);
+
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testReassignGlobalIndexRowRanges() throws Exception {
+ FileStoreTable table = createTableWithInterleavedPartitions();
+ createBTreeIndex(table);
+
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-row-id-index");
+
+ assertThat(result.firstAssignedRowId).isEqualTo(5L);
+ assertThat(result.nextRowId).isEqualTo(10L);
+ assertThat(result.indexFileCount).isEqualTo(5L);
+
+ List<Range> indexRanges = globalIndexRanges(table);
+ assertThat(indexRanges)
+ .containsExactly(
+ new Range(5, 5),
+ new Range(6, 6),
+ new Range(7, 7),
+ new Range(8, 8),
+ new Range(9, 9));
+
+ Predicate predicate =
+ new
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
+ assertThat(readPayloads(table, predicate)).containsExactly("v4");
+
+ DataEvolutionRowIdReassigner.Result secondResult =
+ new
DataEvolutionRowIdReassigner(table).reassign("test-reassign-row-id-index-2");
+
+ assertThat(secondResult.reassigned).isFalse();
+ assertThat(secondResult.previousSnapshotId).isEqualTo(7L);
+ assertThat(secondResult.newSnapshotId).isEqualTo(7L);
+ assertThat(secondResult.firstAssignedRowId).isEqualTo(10L);
+ assertThat(secondResult.nextRowId).isEqualTo(10L);
+ assertThat(secondResult.fileCount).isEqualTo(0L);
+ assertThat(secondResult.rowCount).isEqualTo(0L);
+ assertThat(secondResult.indexFileCount).isEqualTo(0L);
+ assertThat(globalIndexRanges(table))
+ .containsExactly(
+ new Range(5, 5),
+ new Range(6, 6),
+ new Range(7, 7),
+ new Range(8, 8),
+ new Range(9, 9));
+ assertThat(readPayloads(table, predicate)).containsExactly("v4");
+ }
+
+ @Test
+ public void testReassignManyOutOfOrderPartitionEntries() throws Exception {
+ verifyReassignOutOfOrderPartitionEntries(LARGE_ENTRY_COUNT,
LARGE_MANIFEST_FILE_COUNT);
+ }
+
+ @Disabled("Writes one million manifest entries; run manually for large
metadata stress tests.")
+ @Test
+ public void testReassignOneMillionOutOfOrderPartitionEntries() throws
Exception {
+ verifyReassignOutOfOrderPartitionEntries(
+ ONE_MILLION_ENTRY_COUNT, ONE_MILLION_MANIFEST_FILE_COUNT);
+ }
+
+ private FileStoreTable createTableWithInterleavedPartitions() throws
Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeOneRow(table, "a", 0);
+ writeOneRow(table, "b", 1);
+ writeOneRow(table, "a", 2);
+ writeOneRow(table, "b", 3);
+ writeOneRow(table, "a", 4);
+ return table;
+ }
+
+ private FileStoreTable createTableWithPartiallyOverlappedPartitions()
throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeRows(table, "c", 0, 1);
+ writeOneRow(table, "a", 2);
+ writeOneRow(table, "b", 3);
+ writeOneRow(table, "a", 4);
+ return table;
+ }
+
+ private void verifyReassignOutOfOrderPartitionEntries(int entryCount, int
manifestFileCount)
+ throws Exception {
+ assertThat(entryCount % 2).isEqualTo(0);
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeOneRow(table, "seed", -1);
+
+ long oldNextRowId = writeOutOfOrderManifestSnapshot(table, entryCount,
manifestFileCount);
+ Snapshot before = table.snapshotManager().latestSnapshot();
+ List<String> beforeManifestFiles = dataManifestFileNames(table);
+ assertThat(beforeManifestFiles).hasSize(manifestFileCount);
+ assertThat(before.nextRowId()).isEqualTo(oldNextRowId);
+
assertThat(before.totalRecordCount()).isEqualTo(logicalRowCount(entryCount));
+ ExpectedLargeManifestAssignment expectedAssignment =
+ expectedLargeManifestAssignment(table, before, entryCount);
+
+ DataEvolutionRowIdReassigner.Result result =
+ new DataEvolutionRowIdReassigner(table)
+ .reassign("test-reassign-large-out-of-order-row-id");
+
+ assertThat(result.reassigned).isTrue();
+ assertThat(result.previousSnapshotId).isEqualTo(before.id());
+ assertThat(result.newSnapshotId).isEqualTo(before.id() + 1);
+ assertThat(result.firstAssignedRowId).isEqualTo(oldNextRowId);
+ assertThat(result.nextRowId)
+ .isEqualTo(oldNextRowId +
expectedAssignment.reassignedRowCount);
+
assertThat(result.fileCount).isEqualTo(expectedAssignment.reassignedEntryCount);
+
assertThat(result.rowCount).isEqualTo(expectedAssignment.reassignedRowCount);
+ assertThat(result.indexFileCount).isEqualTo(0L);
+
+ assertReassignedOutOfOrderPartitionEntries(
+ table, entryCount, oldNextRowId, expectedAssignment,
beforeManifestFiles);
+ }
+
+ private long writeOutOfOrderManifestSnapshot(
+ FileStoreTable table, int entryCount, int manifestFileCount)
throws Exception {
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ ManifestFile manifestFile =
table.store().manifestFileFactory().create();
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ InternalRowSerializer partitionSerializer =
+ new
InternalRowSerializer(table.schema().logicalPartitionType());
+
+ int partitionCount = largePartitionCount(entryCount);
+ assertThat(entryCount).isGreaterThanOrEqualTo(manifestFileCount);
+ List<ManifestFileMeta> manifestMetas = new ArrayList<>();
+ Random random = new Random(RANDOM_PARTITION_SEED);
+ int entryStart = 0;
+ for (int manifestIndex = 0; manifestIndex < manifestFileCount;
manifestIndex++) {
+ int currentEntryCount =
+ entryCount / manifestFileCount
+ + (manifestIndex < entryCount % manifestFileCount
? 1 : 0);
+ List<ManifestEntry> entries = new ArrayList<>(currentEntryCount);
+ for (int entryOffset = 0; entryOffset < currentEntryCount;
entryOffset++) {
+ int entryIndex = entryStart + entryOffset;
+ int partitionId = random.nextInt(partitionCount);
+ entries.add(
+ largeManifestEntry(
+ partitionSerializer,
+ partitionId,
+ entryIndex,
+ logicalRowCount(entryIndex)));
+ }
+ manifestMetas.addAll(manifestFile.write(entries));
+ entryStart += currentEntryCount;
+ }
+ assertThat(entryStart).isEqualTo(entryCount);
+ assertThat(manifestMetas).hasSize(manifestFileCount);
+
+ Pair<String, Long> baseManifestList =
manifestList.write(manifestMetas);
+ Pair<String, Long> deltaManifestList =
manifestList.write(Collections.emptyList());
+ long oldNextRowId = logicalRowCount(entryCount) + 1L;
+ try (FileStoreCommitImpl commit =
+ (FileStoreCommitImpl)
+
table.store().newCommit("test-large-out-of-order-source", table)) {
+ assertThat(
+ commit.replaceManifestList(
+ latest,
+ logicalRowCount(entryCount),
+ baseManifestList,
+ deltaManifestList,
+ latest.indexManifest(),
+ oldNextRowId))
+ .isTrue();
+ }
+ return oldNextRowId;
+ }
+
+ private ManifestEntry largeManifestEntry(
+ InternalRowSerializer partitionSerializer,
+ int partitionId,
+ int entryOrdinal,
+ long firstRowId) {
+ BinaryRow partition =
+ partitionSerializer
+ .toBinaryRow(
+
GenericRow.of(BinaryString.fromString(largePartition(partitionId))))
+ .copy();
+ long sequenceNumber = sequenceNumber(partitionId);
+ DataFileMeta file =
+ DataFileMeta.forAppend(
+ largeFileName(partitionId, entryOrdinal),
+ 1L,
+ LARGE_ENTRY_ROW_COUNT,
+ SimpleStats.EMPTY_STATS,
+ sequenceNumber,
+ sequenceNumber,
+ 0L,
+ Collections.emptyList(),
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ null);
+ return ManifestEntry.create(FileKind.ADD, partition, 0, 1, file);
+ }
+
+ private void assertReassignedOutOfOrderPartitionEntries(
+ FileStoreTable table,
+ int entryCount,
+ long firstAssignedRowId,
+ ExpectedLargeManifestAssignment expectedAssignment,
+ List<String> beforeManifestFiles) {
+ List<String> afterManifestFiles = dataManifestFileNames(table);
+ assertThat(afterManifestFiles).hasSize(beforeManifestFiles.size());
+ assertThat(afterManifestFiles)
+
.doesNotContainAnyElementsOf(expectedAssignment.rewrittenManifestFiles);
+ List<String> reusedManifestFiles = new
ArrayList<>(beforeManifestFiles);
+
reusedManifestFiles.removeAll(expectedAssignment.rewrittenManifestFiles);
+ assertThat(afterManifestFiles).containsAll(reusedManifestFiles);
+
+ List<ManifestEntry> entries =
+ table.store()
+ .newScan()
+ .withSnapshot(table.snapshotManager().latestSnapshot())
+ .plan()
+ .files();
+ assertThat(entries).hasSize(entryCount);
+
+ int partitionCount = largePartitionCount(entryCount);
+ int[] entriesByPartition = new int[partitionCount];
+ long[] minRowIdByPartition = new long[partitionCount];
+ long[] maxRowIdByPartition = new long[partitionCount];
+ Arrays.fill(minRowIdByPartition, Long.MAX_VALUE);
+ Arrays.fill(maxRowIdByPartition, Long.MIN_VALUE);
+ List<Range> ranges = new ArrayList<>(entryCount);
+ long reassignedEntryCount = 0;
+ long reassignedRowCount = 0;
+ for (ManifestEntry entry : entries) {
+ Range range = entry.file().nonNullRowIdRange();
+ ranges.add(range);
+ assertThat(range.count()).isEqualTo(LARGE_ENTRY_ROW_COUNT);
+ if (range.from >= firstAssignedRowId) {
+ reassignedEntryCount++;
+ reassignedRowCount += range.count();
+ }
+
+ int partitionId =
partitionIdFromLargeFileName(entry.file().fileName());
+ entriesByPartition[partitionId]++;
+ minRowIdByPartition[partitionId] =
+ Math.min(minRowIdByPartition[partitionId], range.from);
+ maxRowIdByPartition[partitionId] =
Math.max(maxRowIdByPartition[partitionId], range.to);
+ long sequenceNumber = sequenceNumber(partitionId);
+
assertThat(entry.file().minSequenceNumber()).isEqualTo(sequenceNumber);
+
assertThat(entry.file().maxSequenceNumber()).isEqualTo(sequenceNumber);
+ }
+
+ Collections.sort(
+ ranges,
+ (left, right) -> {
+ int result = Long.compare(left.from, right.from);
+ return result == 0 ? Long.compare(left.to, right.to) :
result;
+ });
+ long previousEnd = Long.MIN_VALUE;
+ for (Range range : ranges) {
+ assertThat(range.from).isGreaterThan(previousEnd);
+ previousEnd = range.to;
+ }
+
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++)
{
+ assertThat(entriesByPartition[partitionId])
+
.isEqualTo(expectedAssignment.entriesByPartition[partitionId]);
+ if (entriesByPartition[partitionId] == 0) {
+ continue;
+ }
+
+ if (expectedAssignment.partitionsToReassign[partitionId]) {
+ assertThat(minRowIdByPartition[partitionId])
+ .isGreaterThanOrEqualTo(firstAssignedRowId);
+ assertThat(maxRowIdByPartition[partitionId])
+ .isEqualTo(
+ minRowIdByPartition[partitionId]
+ +
logicalRowCount(entriesByPartition[partitionId])
+ - 1);
+ } else {
+ assertThat(minRowIdByPartition[partitionId])
+
.isEqualTo(expectedAssignment.minRowIdByPartition[partitionId]);
+ assertThat(maxRowIdByPartition[partitionId])
+
.isEqualTo(expectedAssignment.maxRowIdByPartition[partitionId]);
+ }
+ }
+
assertThat(reassignedEntryCount).isEqualTo(expectedAssignment.reassignedEntryCount);
+
assertThat(reassignedRowCount).isEqualTo(expectedAssignment.reassignedRowCount);
+ assertThat(table.snapshotManager().latestSnapshot().nextRowId())
+ .isEqualTo(firstAssignedRowId + reassignedRowCount);
+ }
+
+ private long logicalRowCount(long entryCount) {
+ return entryCount * LARGE_ENTRY_ROW_COUNT;
+ }
+
+ private int largePartitionCount(int entryCount) {
+ return entryCount / 2;
+ }
+
+ private ExpectedLargeManifestAssignment expectedLargeManifestAssignment(
+ FileStoreTable table, Snapshot snapshot, int entryCount) {
+ int partitionCount = largePartitionCount(entryCount);
+ int[] entriesByPartition = new int[partitionCount];
+ long[] minRowIdByPartition = new long[partitionCount];
+ long[] maxRowIdByPartition = new long[partitionCount];
+ long logicalRowCount = logicalRowCount(entryCount);
+ assertThat(logicalRowCount).isLessThanOrEqualTo((long)
Integer.MAX_VALUE);
+ boolean[] rowIdExists = new boolean[(int) logicalRowCount];
+ Arrays.fill(minRowIdByPartition, Long.MAX_VALUE);
+ Arrays.fill(maxRowIdByPartition, Long.MIN_VALUE);
+
+ ManifestFile manifestFile =
table.store().manifestFileFactory().create();
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ List<ManifestFileMeta> manifestMetas =
manifestList.readDataManifests(snapshot);
+ long existingRowCount = 0;
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ for (ManifestEntry entry :
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize())) {
+ if (entry.kind() != FileKind.ADD) {
+ continue;
+ }
+ Range range = entry.file().nonNullRowIdRange();
+ assertThat(range.count()).isEqualTo(LARGE_ENTRY_ROW_COUNT);
+ assertThat(range.from).isBetween(0L, logicalRowCount - 1);
+ assertThat(range.to).isBetween(0L, logicalRowCount - 1);
+ for (long rowId = range.from; rowId <= range.to; rowId++) {
+ assertThat(rowIdExists[(int) rowId]).isFalse();
+ rowIdExists[(int) rowId] = true;
+ existingRowCount++;
+ }
+ int partitionId =
partitionIdFromLargeFileName(entry.file().fileName());
+ entriesByPartition[partitionId]++;
+ minRowIdByPartition[partitionId] =
+ Math.min(minRowIdByPartition[partitionId], range.from);
+ maxRowIdByPartition[partitionId] =
+ Math.max(maxRowIdByPartition[partitionId], range.to);
+ }
+ }
+ assertThat(existingRowCount).isEqualTo(logicalRowCount);
+
+ boolean[] partitionsToReassign = new boolean[partitionCount];
+ long reassignedEntryCount = 0;
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++)
{
+ int partitionEntryCount = entriesByPartition[partitionId];
+ if (partitionEntryCount == 0) {
+ continue;
+ }
+ if (maxRowIdByPartition[partitionId] -
minRowIdByPartition[partitionId] + 1
+ != logicalRowCount(partitionEntryCount)) {
+ partitionsToReassign[partitionId] = true;
+ reassignedEntryCount += partitionEntryCount;
+ }
+ }
+
+ Set<String> rewrittenManifestFiles = new HashSet<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ for (ManifestEntry entry :
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize())) {
+ if (entry.kind() != FileKind.ADD) {
+ continue;
+ }
+ int partitionId =
partitionIdFromLargeFileName(entry.file().fileName());
+ if (partitionsToReassign[partitionId]) {
+ rewrittenManifestFiles.add(manifestMeta.fileName());
+ break;
+ }
+ }
+ }
+ assertThat(reassignedEntryCount).isGreaterThan(0L);
+ return new ExpectedLargeManifestAssignment(
+ entriesByPartition,
+ minRowIdByPartition,
+ maxRowIdByPartition,
+ partitionsToReassign,
+ reassignedEntryCount,
+ logicalRowCount(reassignedEntryCount),
+ rewrittenManifestFiles);
+ }
+
+ private String largePartition(int partitionId) {
+ String partition = Integer.toString(partitionId);
+ StringBuilder builder = new StringBuilder("p");
+ for (int i = partition.length(); i < LARGE_PARTITION_ID_WIDTH; i++) {
+ builder.append('0');
+ }
+ return builder.append(partition).toString();
+ }
+
+ private String largeFileName(int partitionId, int entryOrdinal) {
+ return LARGE_FILE_PREFIX + partitionId + "-" + entryOrdinal +
LARGE_FILE_SUFFIX;
+ }
+
+ private int partitionIdFromLargeFileName(String fileName) {
+ String withoutPrefixAndSuffix =
+ fileName.substring(
+ LARGE_FILE_PREFIX.length(), fileName.length() -
LARGE_FILE_SUFFIX.length());
+ return Integer.parseInt(
+ withoutPrefixAndSuffix.substring(0,
withoutPrefixAndSuffix.indexOf('-')));
+ }
+
+ private long sequenceNumber(int partitionId) {
+ return 10_000_000L + partitionId;
+ }
+
+ private void writeOneRow(FileStoreTable table, String partition, int id)
throws Exception {
+ writeRows(table, partition, id);
+ }
+
+ private void writeRows(FileStoreTable table, String partition, int... ids)
throws Exception {
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite();
+ BatchTableCommit commit = builder.newCommit()) {
+ for (int id : ids) {
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString(partition),
+ id,
+ BinaryString.fromString("v" + id)));
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private void writeUnpartitionedRows(FileStoreTable table, int... ids)
throws Exception {
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite();
+ BatchTableCommit commit = builder.newCommit()) {
+ for (int id : ids) {
+ write.write(GenericRow.of(id, BinaryString.fromString("v" +
id)));
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private PartitionPredicate partitionPredicate(FileStoreTable table, String
partition) {
+ return PartitionPredicate.fromMaps(
+ table.schema().logicalPartitionType(),
+ Collections.singletonList(Collections.singletonMap("pt",
partition)),
+ table.coreOptions().partitionDefaultName());
+ }
+
+ private Map<String, List<Long>> rowIdsByPartition(FileStoreTable table) {
+ List<ManifestEntry> entries =
+ table.store()
+ .newScan()
+ .withSnapshot(table.snapshotManager().latestSnapshot())
+ .plan()
+ .files();
+ Map<String, List<Long>> result = new LinkedHashMap<>();
+ for (ManifestEntry entry : entries) {
+ String partition =
table.store().pathFactory().getPartitionString(entry.partition());
+ result.computeIfAbsent(partition, k -> new ArrayList<>())
+ .add(entry.file().nonNullFirstRowId());
+ }
+ for (List<Long> rowIds : result.values()) {
+ Collections.sort(rowIds);
+ }
+ return result;
+ }
+
+ private Map<String, List<Long>> expandedRowIdsByPartition(FileStoreTable
table) {
+ List<ManifestEntry> entries =
+ table.store()
+ .newScan()
+ .withSnapshot(table.snapshotManager().latestSnapshot())
+ .plan()
+ .files();
+ Map<String, List<Long>> result = new LinkedHashMap<>();
+ for (ManifestEntry entry : entries) {
+ String partition =
table.store().pathFactory().getPartitionString(entry.partition());
+ List<Long> rowIds = result.computeIfAbsent(partition, k -> new
ArrayList<>());
+ Range range = entry.file().nonNullRowIdRange();
+ for (long rowId = range.from; rowId <= range.to; rowId++) {
+ rowIds.add(rowId);
+ }
+ }
+ for (List<Long> rowIds : result.values()) {
+ Collections.sort(rowIds);
+ }
+ return result;
+ }
+
+ private Map<String, Set<String>>
currentPartitionsByManifest(FileStoreTable table) {
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ ManifestFile manifestFile =
table.store().manifestFileFactory().create();
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ List<ManifestFileMeta> manifestMetas =
manifestList.readDataManifests(latest);
+
+ Set<FileEntry.Identifier> deletedIdentifiers = new HashSet<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ if (manifestMeta.numDeletedFiles() <= 0) {
+ continue;
+ }
+ for (ManifestEntry entry :
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize())) {
+ if (entry.kind() == FileKind.DELETE) {
+ deletedIdentifiers.add(entry.identifier());
+ }
+ }
+ }
+
+ Map<String, Set<String>> result = new LinkedHashMap<>();
+ for (ManifestFileMeta manifestMeta : manifestMetas) {
+ if (manifestMeta.numAddedFiles() <= 0) {
+ continue;
+ }
+ for (ManifestEntry entry :
+ manifestFile.read(manifestMeta.fileName(),
manifestMeta.fileSize())) {
+ if (entry.kind() != FileKind.ADD
+ || deletedIdentifiers.contains(entry.identifier())) {
+ continue;
+ }
+ String partition =
+
table.store().pathFactory().getPartitionString(entry.partition());
+ result.computeIfAbsent(manifestMeta.fileName(), k -> new
LinkedHashSet<>())
+ .add(partition);
+ }
+ }
+ return result;
+ }
+
+ private List<String> dataManifestFileNames(FileStoreTable table) {
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ Snapshot latest = table.snapshotManager().latestSnapshot();
+ List<String> result = new ArrayList<>();
+ for (ManifestFileMeta manifestMeta :
manifestList.readDataManifests(latest)) {
+ result.add(manifestMeta.fileName());
+ }
+ return result;
+ }
+
+ private void createBTreeIndex(FileStoreTable table) throws Exception {
+ BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("id");
+ List<DataSplit> dataSplits =
+ builder.scan()
+ .map(Pair::getRight)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected scan result when
building index."));
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (DataSplit dataSplit :
BTreeGlobalIndexBuilder.splitByContiguousRowRange(dataSplits)) {
+ commitMessages.addAll(builder.build(dataSplit, ioManager));
+ }
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(commitMessages);
+ }
+ }
+
+ private List<Range> globalIndexRanges(FileStoreTable table) {
+ List<Range> ranges = new ArrayList<>();
+ List<IndexManifestEntry> entries =
+ table.store()
+ .indexManifestFileFactory()
+ .create()
+
.read(table.snapshotManager().latestSnapshot().indexManifest());
+ for (IndexManifestEntry entry : entries) {
+ GlobalIndexMeta globalIndex = entry.indexFile().globalIndexMeta();
+ if (globalIndex != null) {
+ ranges.add(globalIndex.rowRange());
+ }
+ }
+ Collections.sort(
+ ranges,
+ (left, right) -> {
+ int result = Long.compare(left.from, right.from);
+ return result == 0 ? Long.compare(left.to, right.to) :
result;
+ });
+ return ranges;
+ }
+
+ private List<String> readPayloads(FileStoreTable table, Predicate
predicate) throws Exception {
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+ List<String> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ InternalRow projected = row;
+ result.add(projected.getString(2).toString());
+ });
+ return result;
+ }
+
+ private static class ExpectedLargeManifestAssignment {
+
+ private final int[] entriesByPartition;
+ private final long[] minRowIdByPartition;
+ private final long[] maxRowIdByPartition;
+ private final boolean[] partitionsToReassign;
+ private final long reassignedEntryCount;
+ private final long reassignedRowCount;
+ private final Set<String> rewrittenManifestFiles;
+
+ private ExpectedLargeManifestAssignment(
+ int[] entriesByPartition,
+ long[] minRowIdByPartition,
+ long[] maxRowIdByPartition,
+ boolean[] partitionsToReassign,
+ long reassignedEntryCount,
+ long reassignedRowCount,
+ Set<String> rewrittenManifestFiles) {
+ this.entriesByPartition = entriesByPartition;
+ this.minRowIdByPartition = minRowIdByPartition;
+ this.maxRowIdByPartition = maxRowIdByPartition;
+ this.partitionsToReassign = partitionsToReassign;
+ this.reassignedEntryCount = reassignedEntryCount;
+ this.reassignedRowCount = reassignedRowCount;
+ this.rewrittenManifestFiles = rewrittenManifestFiles;
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
new file mode 100644
index 0000000000..7381e9078b
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link RowRangeMappingIndex}. */
+public class RowRangeMappingIndexTest {
+
+ @Test
+ public void testMapSingleRange() {
+ RowRangeMappingIndex index =
+ RowRangeMappingIndex.create(
+ Arrays.asList(RowRangeMappingIndex.mapping(10, 19,
100)));
+
+ assertThat(index.map(new Range(12, 15))).isEqualTo(new Range(102,
105));
+ }
+
+ @Test
+ public void testMapAcrossContiguousRanges() {
+ RowRangeMappingIndex index =
+ RowRangeMappingIndex.create(
+ Arrays.asList(
+ RowRangeMappingIndex.mapping(10, 14, 100),
+ RowRangeMappingIndex.mapping(15, 19, 105),
+ RowRangeMappingIndex.mapping(20, 24, 110)));
+
+ assertThat(index.map(new Range(12, 22))).isEqualTo(new Range(102,
112));
+ }
+
+ @Test
+ public void testMapFailsWhenOldRangeIsNotCovered() {
+ RowRangeMappingIndex index =
+ RowRangeMappingIndex.create(
+ Arrays.asList(
+ RowRangeMappingIndex.mapping(10, 14, 100),
+ RowRangeMappingIndex.mapping(20, 24, 105)));
+
+ assertThatThrownBy(() -> index.map(new Range(12, 22)))
+ .hasMessageContaining("is not fully covered");
+ }
+
+ @Test
+ public void testMapFailsWhenNewRangeIsNotContiguous() {
+ RowRangeMappingIndex index =
+ RowRangeMappingIndex.create(
+ Arrays.asList(
+ RowRangeMappingIndex.mapping(10, 14, 100),
+ RowRangeMappingIndex.mapping(15, 19, 200)));
+
+ assertThatThrownBy(() -> index.map(new Range(12, 17)))
+ .hasMessageContaining("maps to non-contiguous new row range");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdAction.java
new file mode 100644
index 0000000000..f094e22ddc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdAction.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionRowIdReassigner;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Action to reassign row IDs for data evolution tables. */
+public class ReassignRowIdAction extends ActionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReassignRowIdAction.class);
+
+ private final Identifier identifier;
+ private final Map<String, String> tableConf;
+ private final List<Map<String, String>> partitionSpecs;
+
+ public ReassignRowIdAction(
+ Map<String, String> catalogConfig,
+ String databaseName,
+ String tableName,
+ Map<String, String> tableConfig,
+ List<Map<String, String>> partitionSpecs) {
+ super(catalogConfig);
+ this.identifier = Identifier.create(databaseName, tableName);
+ this.tableConf = new HashMap<>(tableConfig);
+ this.partitionSpecs = new ArrayList<>(partitionSpecs);
+ }
+
+ @Override
+ public void build() throws Exception {
+ Table table = catalog.getTable(identifier).copy(tableConf);
+ checkArgument(
+ table instanceof FileStoreTable,
+ "Action '%s' only supports file store table, but table '%s' is
%s.",
+ ReassignRowIdActionFactory.IDENTIFIER,
+ identifier.getFullName(),
+ table.getClass().getName());
+
+ DataStreamSource<Void> source =
+ env.addSource(
+ new ReassignRowIdSource(
+ identifier.getFullName(), (FileStoreTable)
table, partitionSpecs),
+ "Reassign Row ID : " + identifier.getFullName(),
+ BasicTypeInfo.VOID_TYPE_INFO);
+ ((LegacySourceTransformation<?>) source.getTransformation())
+ .setBoundedness(Boundedness.BOUNDED);
+ source.setParallelism(1).forward().sinkTo(new
DiscardingSink<>()).setParallelism(1);
+ }
+
+ @Override
+ public void run() throws Exception {
+ build();
+ env.execute("Reassign Row ID : " + identifier.getFullName());
+ }
+
+ public static @Nullable PartitionPredicate toPartitionPredicate(
+ FileStoreTable table, List<Map<String, String>> partitionSpecs) {
+ if (partitionSpecs.isEmpty()) {
+ return null;
+ }
+
+ RowType partitionType = table.schema().logicalPartitionType();
+ List<String> partitionKeys = table.partitionKeys();
+ checkArgument(
+ partitionType.getFieldCount() > 0,
+ "Partition filter can only be used for partitioned table
'%s'.",
+ table.name());
+
+ String defaultName = table.coreOptions().partitionDefaultName();
+ for (Map<String, String> partitionSpec : partitionSpecs) {
+ checkArgument(
+ partitionSpec.keySet().containsAll(partitionKeys)
+ &&
partitionKeys.containsAll(partitionSpec.keySet()),
+ "Partition filter for table '%s' must match partition keys
%s, but was %s.",
+ table.name(),
+ partitionKeys,
+ partitionSpec);
+ }
+
+ return PartitionPredicate.fromMaps(partitionType, partitionSpecs,
defaultName);
+ }
+
+ public static String formatResult(
+ String tableName, DataEvolutionRowIdReassigner.Result result) {
+ if (!result.reassigned) {
+ String reason =
+ result.skipReason == null
+ ? "row IDs are already partition-contiguous"
+ : result.skipReason;
+ return String.format(
+ "Skipped. Row IDs for table '%s' were not reassigned
because %s:"
+ + " snapshot %d unchanged, nextRowId=%d.",
+ tableName, reason, result.previousSnapshotId,
result.nextRowId);
+ }
+ return String.format(
+ "Success. Reassigned row IDs for table '%s': snapshot %d ->
%d,"
+ + " nextRowId %d -> %d, files=%d, rows=%d,
indexFiles=%d.",
+ tableName,
+ result.previousSnapshotId,
+ result.newSnapshotId,
+ result.firstAssignedRowId,
+ result.nextRowId,
+ result.fileCount,
+ result.rowCount,
+ result.indexFileCount);
+ }
+
+ private static class ReassignRowIdSource implements SourceFunction<Void> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String tableName;
+ private final FileStoreTable table;
+ private final List<Map<String, String>> partitionSpecs;
+
+ private ReassignRowIdSource(
+ String tableName, FileStoreTable table, List<Map<String,
String>> partitionSpecs) {
+ this.tableName = tableName;
+ this.table = table;
+ this.partitionSpecs = partitionSpecs;
+ }
+
+ @Override
+ public void run(SourceContext<Void> sourceContext) {
+ DataEvolutionRowIdReassigner.Result result =
+ new DataEvolutionRowIdReassigner(
+ table, toPartitionPredicate(table,
partitionSpecs))
+ .reassign();
+ String message = formatResult(tableName, result);
+ LOG.info(message);
+ System.out.println(message);
+ }
+
+ @Override
+ public void cancel() {}
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdActionFactory.java
new file mode 100644
index 0000000000..1f6967d14d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdActionFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.ReassignRowIdProcedure;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link ReassignRowIdAction}. */
+public class ReassignRowIdActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = ReassignRowIdProcedure.IDENTIFIER;
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ return Optional.of(
+ new ReassignRowIdAction(
+ catalogConfigMap(params),
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ optionalConfigMap(params, TABLE_CONF),
+ optionalPartitions(params)));
+ }
+
+ private List<Map<String, String>>
optionalPartitions(MultipleParameterToolAdapter params) {
+ return params.has(PARTITION) ? getPartitions(params) :
Collections.emptyList();
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \""
+ + IDENTIFIER
+ + "\" reassigns row IDs for a data evolution table
when partition "
+ + "row-id ranges overlap.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " "
+ + IDENTIFIER
+ + " --warehouse <warehouse_path> --database
<database_name> "
+ + "--table <table_name> [--table_conf <key>=<value>] "
+ + "[--catalog_conf <key>=<value>] "
+ + "[--partition <key>=<value>[,<key>=<value>] ...]");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " "
+ + IDENTIFIER
+ + " --warehouse hdfs:///path/to/warehouse "
+ + "--database test_db --table test_table "
+ + "--partition dt=2026-05-19");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReassignRowIdProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReassignRowIdProcedure.java
new file mode 100644
index 0000000000..cf33aa0f81
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReassignRowIdProcedure.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionRowIdReassigner;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.ReassignRowIdAction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Procedure to reassign row IDs for data evolution tables. */
+public class ReassignRowIdProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "reassign_row_id";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
+ public String[] call(ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ return reassign(tableId, Collections.emptyList());
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "partitions", type =
@DataTypeHint("STRING"))
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId,
String partitions)
+ throws Catalog.TableNotExistException {
+ return reassign(tableId,
ParameterUtils.getPartitions(partitions.split(";")));
+ }
+
+ private String[] reassign(String tableId, List<Map<String, String>>
partitionSpecs)
+ throws Catalog.TableNotExistException {
+ Table table = table(tableId);
+ checkArgument(
+ table instanceof FileStoreTable,
+ "Procedure '%s' only supports file store table, but table '%s'
is %s.",
+ IDENTIFIER,
+ tableId,
+ table.getClass().getName());
+
+ DataEvolutionRowIdReassigner.Result result =
+ new DataEvolutionRowIdReassigner(
+ (FileStoreTable) table,
+ ReassignRowIdAction.toPartitionPredicate(
+ (FileStoreTable) table,
partitionSpecs))
+ .reassign();
+ return new String[] {ReassignRowIdAction.formatResult(tableId,
result)};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index eb83dcdcb3..db2777a3a0 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -50,6 +50,7 @@ org.apache.paimon.flink.action.ClearConsumerActionFactory
org.apache.paimon.flink.action.RescaleActionFactory
org.apache.paimon.flink.action.CloneActionFactory
org.apache.paimon.flink.action.DataEvolutionMergeIntoActionFactory
+org.apache.paimon.flink.action.ReassignRowIdActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -102,6 +103,7 @@
org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
+org.apache.paimon.flink.procedure.ReassignRowIdProcedure
org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
org.apache.paimon.flink.procedure.VectorSearchProcedure
org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionRowIdReassignerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionRowIdReassignerTest.java
new file mode 100644
index 0000000000..7bbe5eb497
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionRowIdReassignerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.ReassignRowIdAction;
+import org.apache.paimon.flink.action.ReassignRowIdActionFactory;
+import org.apache.paimon.flink.procedure.ReassignRowIdProcedure;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ReassignRowIdAction} and {@link ReassignRowIdProcedure}.
*/
+public class DataEvolutionRowIdReassignerTest extends TableTestBase {
+
+ @Override
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("pt", DataTypes.STRING());
+ schemaBuilder.column("id", DataTypes.INT());
+ schemaBuilder.column("payload", DataTypes.STRING());
+ schemaBuilder.partitionKeys("pt");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.GLOBAL_INDEX_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BUCKET.key(), "-1");
+ schemaBuilder.option(CoreOptions.SCAN_MANIFEST_PARALLELISM.key(), "1");
+ return schemaBuilder.build();
+ }
+
+ @Test
+ public void testActionReassignRowIdsByPartition() throws Exception {
+ FileStoreTable table = createTableWithInterleavedPartitions();
+
+ Action action =
+ ActionFactory.createAction(
+ new String[] {
+ ReassignRowIdActionFactory.IDENTIFIER,
+ "--warehouse",
+ warehouse.toString(),
+ "--database",
+ database,
+ "--table",
+ DEFAULT_TABLE_NAME
+ })
+ .get();
+ assertThat(action).isInstanceOf(ReassignRowIdAction.class);
+
+ action.run();
+
+ Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
+ assertThat(rowIdsByPartition).hasSize(2);
+ assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L,
6L, 7L));
+ assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L,
9L));
+
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(10L);
+ }
+
+ @Test
+ public void testActionBuildsBoundedSourceWithSink() throws Exception {
+ createTableDefault();
+ ReassignRowIdAction action =
+ (ReassignRowIdAction)
+ ActionFactory.createAction(
+ new String[] {
+
ReassignRowIdActionFactory.IDENTIFIER,
+ "--warehouse",
+ warehouse.toString(),
+ "--database",
+ database,
+ "--table",
+ DEFAULT_TABLE_NAME
+ })
+ .get();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ action.withStreamExecutionEnvironment(env).build();
+
+ List<Transformation<?>> transformations = env.getTransformations();
+ assertThat(transformations).isNotEmpty();
+ String sourceName = "Reassign Row ID : " + database + "." +
DEFAULT_TABLE_NAME;
+ Transformation<?> source =
+ transformations.stream()
+ .map(transformation ->
findTransformationByName(transformation, sourceName))
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElse(null);
+ assertThat(source).isNotNull();
+ assertThat(source).isInstanceOf(LegacySourceTransformation.class);
+ assertThat(((LegacySourceTransformation<?>) source).getBoundedness())
+ .isEqualTo(Boundedness.BOUNDED);
+ assertThat(source.getParallelism()).isEqualTo(1);
+ assertThat(source.getInputs()).isEmpty();
+
+ Transformation<?> sink =
+ transformations.stream()
+ .filter(transformation -> transformation != source)
+ .filter(
+ transformation ->
+
findTransformationByName(transformation, sourceName)
+ == source)
+ .findFirst()
+ .orElse(null);
+ assertThat(sink).isNotNull();
+ assertThat(sink.getParallelism()).isEqualTo(1);
+ assertThat(findTransformationByName(sink,
sourceName)).isSameAs(source);
+
assertThat(env.getStreamGraph().getJobGraph().getVertices()).hasSize(2);
+ }
+
+ @Test
+ public void testProcedureReassignsSelectedPartition() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeOneRow(table, "a", 0);
+ writeOneRow(table, "b", 1);
+ writeOneRow(table, "c", 2);
+ writeOneRow(table, "c", 3);
+ writeOneRow(table, "d", 4);
+ writeOneRow(table, "d", 5);
+ writeOneRow(table, "b", 6);
+
+ ReassignRowIdProcedure procedure = new ReassignRowIdProcedure();
+ procedure.withCatalog(catalog);
+ String[] result = procedure.call(null, database + "." +
DEFAULT_TABLE_NAME, "pt=b");
+
+ assertThat(result).hasSize(1);
+ assertThat(result[0]).contains("Success. Reassigned row IDs");
+ assertThat(rowIdsByPartition(table))
+ .containsEntry("pt=a/", Collections.singletonList(0L))
+ .containsEntry("pt=b/", Arrays.asList(7L, 8L))
+ .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+ .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+ }
+
+ private Transformation<?> findTransformationByName(
+ Transformation<?> transformation, String expectedName) {
+ if (expectedName.equals(transformation.getName())) {
+ return transformation;
+ }
+ for (Transformation<?> input : transformation.getInputs()) {
+ Transformation<?> found = findTransformationByName(input,
expectedName);
+ if (found != null) {
+ return found;
+ }
+ }
+ return null;
+ }
+
+ private FileStoreTable createTableWithInterleavedPartitions() throws
Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ writeOneRow(table, "a", 0);
+ writeOneRow(table, "b", 1);
+ writeOneRow(table, "a", 2);
+ writeOneRow(table, "b", 3);
+ writeOneRow(table, "a", 4);
+ return table;
+ }
+
+ private void writeOneRow(FileStoreTable table, String partition, int id)
throws Exception {
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite();
+ BatchTableCommit commit = builder.newCommit()) {
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString(partition),
+ id,
+ BinaryString.fromString("v" + id)));
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private Map<String, List<Long>> rowIdsByPartition(FileStoreTable table) {
+ List<ManifestEntry> entries =
+ table.store()
+ .newScan()
+ .withSnapshot(table.snapshotManager().latestSnapshot())
+ .plan()
+ .files();
+ Map<String, List<Long>> result = new LinkedHashMap<>();
+ for (ManifestEntry entry : entries) {
+ String partition =
table.store().pathFactory().getPartitionString(entry.partition());
+ result.computeIfAbsent(partition, k -> new ArrayList<>())
+ .add(entry.file().firstRowId());
+ }
+ for (List<Long> rowIds : result.values()) {
+ Collections.sort(rowIds);
+ }
+ return result;
+ }
+}
diff --git
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
new file mode 100644
index 0000000000..4be481369e
--- /dev/null
+++
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * The {@link
org.apache.flink.streaming.api.functions.source.legacy.SourceFunction} migrated
from
+ * Flink 1.x to resolve compatibility issues with Flink 2.x.
+ */
+public interface SourceFunction<T>
+ extends
org.apache.flink.streaming.api.functions.source.legacy.SourceFunction<T> {}