This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 77d0dcec0f [flink] Add data evolution row id reassign action (#7957)
77d0dcec0f is described below

commit 77d0dcec0ff508ed21f7df29c92e980e1997346f
Author: YeJunHao <[email protected]>
AuthorDate: Tue May 26 16:15:52 2026 +0800

    [flink] Add data evolution row id reassign action (#7957)
    
    Add a Flink action and procedure to reassign row IDs for data evolution
    tables by rewriting metadata. This is useful when partition row-id
    ranges overlap and should be normalized without rewriting data files.
---
 docs/docs/flink/action-jars.md                     |  28 +
 docs/docs/flink/procedures.md                      |  23 +-
 .../DataEvolutionRowIdReassigner.java              | 939 ++++++++++++++++++++
 .../append/dataevolution/RowRangeMappingIndex.java | 138 +++
 .../DataEvolutionRowIdReassignerTest.java          | 949 +++++++++++++++++++++
 .../dataevolution/RowRangeMappingIndexTest.java    |  77 ++
 .../paimon/flink/action/ReassignRowIdAction.java   | 175 ++++
 .../flink/action/ReassignRowIdActionFactory.java   |  80 ++
 .../flink/procedure/ReassignRowIdProcedure.java    |  83 ++
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../DataEvolutionRowIdReassignerTest.java          | 230 +++++
 .../api/functions/source/SourceFunction.java       |  26 +
 12 files changed, 2749 insertions(+), 1 deletion(-)

diff --git a/docs/docs/flink/action-jars.md b/docs/docs/flink/action-jars.md
index b2c3e5a73e..419c342008 100644
--- a/docs/docs/flink/action-jars.md
+++ b/docs/docs/flink/action-jars.md
@@ -307,6 +307,34 @@ For more information of 'rewrite_file_index', see
     rewrite_file_index --help
 ```
 
+## Reassign Row ID
+
+Run the following command to submit a 'reassign_row_id' job for a data 
evolution table.
+This action rewrites metadata to make partition row-id ranges non-overlapping.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-@@VERSION@@.jar \
+    reassign_row_id \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table <table-name> \
+    [--table_conf <paimon-table-conf> [--table_conf <paimon-table-conf> ...]] \
+    [--partition <partition_spec> [--partition <partition_spec> ...]] \
+    [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
+
+partition_spec:
+key1=value1,key2=value2...
+```
+
+For more information of 'reassign_row_id', see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-@@VERSION@@.jar \
+    reassign_row_id --help
+```
+
 ## Force Start Flink Job
 
 Some actions, like `create_tag`, are lightweight and by default will not be 
submitted as a job to Flink cluster. If you
diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index dc4f1fedaf..c1286f2be6 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -754,7 +754,28 @@ All available procedures are listed below.
          CALL sys.rewrite_file_index(`table` => 'test_db.T')<br/><br/>
          -- rewrite the file index for the specified partition in the 
table<br/>
          CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions => 
'pt=a')<br/><br/>
-     </td>
+      </td>
+   </tr>
+   <tr>
+      <td>reassign_row_id</td>
+      <td>
+         -- Use named argument<br/>
+         CALL [catalog.]sys.reassign_row_id(&lt`table` => identifier&gt [, 
&ltpartitions => partitions&gt])<br/><br/>
+         -- Use indexed argument<br/>
+         CALL [catalog.]sys.reassign_row_id(&ltidentifier&gt [, 
&ltpartitions&gt])<br/><br/>
+      </td>
+      <td>
+         Reassign row IDs for a data evolution table by rewriting metadata. 
Argument:
+            <li>table: &ltdatabaseName&gt.&lttableName&gt.</li>
+            <li>partitions : specific partitions.</li>
+      </td>
+      <td>
+         -- reassign row IDs for the whole table<br/>
+         CALL sys.reassign_row_id(`table` => 'test_db.T')<br/><br/>
+         -- reassign row IDs for the specified partition in the table<br/>
+         CALL sys.reassign_row_id(`table` => 'test_db.T', partitions => 
'pt=a')<br/><br/>
+      </td>
+   </tr>
    <tr>
       <td>create_branch</td>
       <td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
new file mode 100644
index 0000000000..1faa4cf66a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java
@@ -0,0 +1,939 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Reassigns row IDs for data evolution tables by rewriting metadata only. */
+public class DataEvolutionRowIdReassigner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataEvolutionRowIdReassigner.class);
+    private static final String COMMIT_USER_PREFIX = "reassign-row-id";
+
+    private final FileStoreTable table;
+    private final @Nullable PartitionPredicate partitionPredicate;
+
+    public DataEvolutionRowIdReassigner(FileStoreTable table) {
+        this(table, null);
+    }
+
+    public DataEvolutionRowIdReassigner(
+            FileStoreTable table, @Nullable PartitionPredicate 
partitionPredicate) {
+        this.table = table;
+        this.partitionPredicate = partitionPredicate;
+    }
+
+    public Result reassign() {
+        Map<String, String> dynamicOptions = new 
HashMap<>(table.coreOptions().toMap());
+        dynamicOptions.put(CoreOptions.COMMIT_USER_PREFIX.key(), 
COMMIT_USER_PREFIX);
+        return reassign(CoreOptions.createCommitUser(new 
Options(dynamicOptions)));
+    }
+
+    public Result reassign(String commitUser) {
+        checkArgument(
+                table.coreOptions().rowTrackingEnabled(),
+                "Table '%s' must enable 'row-tracking.enabled=true' before 
reassigning row IDs.",
+                table.name());
+        checkArgument(
+                table.coreOptions().dataEvolutionEnabled(),
+                "Table '%s' must enable 'data-evolution.enabled=true' before 
reassigning row IDs.",
+                table.name());
+
+        Snapshot latest = table.snapshotManager().latestSnapshot();
+        checkArgument(
+                latest != null, "Cannot reassign row IDs for empty table 
'%s'.", table.name());
+        Long nextRowId = latest.nextRowId();
+        checkState(
+                nextRowId != null,
+                "Next row id cannot be null for row-tracking table '%s'.",
+                table.name());
+        if (table.schema().logicalPartitionType().getFieldCount() == 0) {
+            LOG.info(
+                    "Skip reassigning row IDs for table {} because it is not 
partitioned.",
+                    table.name());
+            return Result.skipped(latest.id(), nextRowId, "table is not 
partitioned");
+        }
+
+        ManifestFile manifestFile = 
table.store().manifestFileFactory().create();
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        List<ManifestFileMeta> manifestMetas = 
manifestList.readDataManifests(latest);
+        AssignmentPlan assignment = planAssignment(manifestMetas, 
manifestFile, nextRowId);
+        if (!assignment.hasCurrentFiles) {
+            return Result.skipped(
+                    latest.id(),
+                    nextRowId,
+                    partitionFilterEnabled()
+                            ? "partition filter matches no current files"
+                            : "table has no current files");
+        }
+        if (assignment.reassignedFileCount == 0) {
+            LOG.info(
+                    "Skip reassigning row IDs for table {} because partition 
row IDs are already contiguous.",
+                    table.name());
+            return Result.skipped(
+                    latest.id(), nextRowId, "partition row IDs are already 
contiguous");
+        }
+
+        Pair<String, Long> baseManifestList =
+                writeBaseManifestList(
+                        manifestMetas, assignment.rewrittenManifestMetas, 
manifestList);
+        Pair<String, Long> deltaManifestList = 
manifestList.write(Collections.emptyList());
+
+        RewrittenIndexManifest rewrittenIndexManifest = 
rewriteIndexManifest(latest, assignment);
+
+        try (FileStoreCommitImpl commit =
+                (FileStoreCommitImpl) table.store().newCommit(commitUser, 
table)) {
+            boolean success =
+                    commit.replaceManifestList(
+                            latest,
+                            latest.totalRecordCount(),
+                            baseManifestList,
+                            deltaManifestList,
+                            rewrittenIndexManifest.indexManifest,
+                            assignment.nextRowId);
+            if (!success) {
+                throw new RuntimeException(
+                        "Failed to reassign row IDs because a newer snapshot 
has been committed.");
+            }
+        }
+
+        LOG.info(
+                "Reassigned row IDs for table {} from {} to {}, partitions={}, 
files={}, rows={}.",
+                table.name(),
+                nextRowId,
+                assignment.nextRowId,
+                assignment.rowIdMappings.size(),
+                assignment.reassignedFileCount,
+                assignment.logicalRowCount);
+        return new Result(
+                latest.id(),
+                latest.id() + 1,
+                assignment.reassignedFileCount,
+                assignment.logicalRowCount,
+                rewrittenIndexManifest.indexFileCount,
+                nextRowId,
+                assignment.nextRowId);
+    }
+
+    private AssignmentPlan planAssignment(
+            List<ManifestFileMeta> manifestMetas, ManifestFile manifestFile, 
long firstRowId) {
+        List<List<ManifestFileMeta>> manifestGroups = 
manifestGroupsByPartition(manifestMetas);
+        Map<String, List<ManifestFileMeta>> rewrittenManifestMetas = new 
HashMap<>();
+        Map<BinaryRow, RowRangeMappingIndex> rowIdMappings = new 
LinkedHashMap<>();
+        long nextRowId = firstRowId;
+        long logicalRowCount = 0;
+        long reassignedFileCount = 0;
+        boolean hasCurrentFiles = false;
+
+        for (List<ManifestFileMeta> manifestGroup : manifestGroups) {
+            if (skipManifestGroupByPartitionFilter(manifestGroup)) {
+                continue;
+            }
+
+            CurrentManifest currentManifest = currentManifest(manifestGroup, 
manifestFile);
+            List<ManifestEntry> currentEntries = currentManifest.entries();
+            if (currentEntries.isEmpty()) {
+                continue;
+            }
+            hasCurrentFiles = true;
+
+            Map<BinaryRow, List<ManifestEntry>> entriesByPartition =
+                    entriesByPartition(currentEntries);
+            Set<BinaryRow> partitionsToReassign = 
partitionsToReassign(entriesByPartition);
+            if (partitionsToReassign.isEmpty()) {
+                continue;
+            }
+
+            Assignment groupAssignment =
+                    assign(entriesByPartition, partitionsToReassign, 
nextRowId);
+            nextRowId = groupAssignment.nextRowId;
+            logicalRowCount += groupAssignment.logicalRowCount;
+            reassignedFileCount += groupAssignment.reassignedFileCount;
+            for (Map.Entry<BinaryRow, RowRangeMappingIndex> mapping :
+                    groupAssignment.rowIdMappings.entrySet()) {
+                RowRangeMappingIndex previous =
+                        rowIdMappings.put(mapping.getKey(), 
mapping.getValue());
+                checkState(
+                        previous == null,
+                        "Partition %s appears in multiple manifest groups.",
+                        
table.store().pathFactory().getPartitionString(mapping.getKey()));
+            }
+
+            Map<String, List<ManifestFileMeta>> groupRewrittenManifestMetas =
+                    writeManifestReplacements(
+                            currentManifest, groupAssignment, 
partitionsToReassign, manifestFile);
+            for (Map.Entry<String, List<ManifestFileMeta>> rewritten :
+                    groupRewrittenManifestMetas.entrySet()) {
+                List<ManifestFileMeta> previous =
+                        rewrittenManifestMetas.put(rewritten.getKey(), 
rewritten.getValue());
+                checkState(
+                        previous == null,
+                        "Manifest file %s appears in multiple manifest 
groups.",
+                        rewritten.getKey());
+            }
+        }
+
+        return new AssignmentPlan(
+                rewrittenManifestMetas,
+                rowIdMappings,
+                nextRowId,
+                logicalRowCount,
+                reassignedFileCount,
+                hasCurrentFiles);
+    }
+
+    private List<List<ManifestFileMeta>> manifestGroupsByPartition(
+            List<ManifestFileMeta> manifestMetas) {
+        List<ManifestFileMeta> nonEmptyManifestMetas = new ArrayList<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            if (manifestMeta.numAddedFiles() + manifestMeta.numDeletedFiles() 
> 0) {
+                nonEmptyManifestMetas.add(manifestMeta);
+            }
+        }
+        if (nonEmptyManifestMetas.size() <= 1) {
+            return nonEmptyManifestMetas.isEmpty()
+                    ? Collections.emptyList()
+                    : Collections.singletonList(nonEmptyManifestMetas);
+        }
+
+        int partitionFieldCount = 
table.schema().logicalPartitionType().getFieldCount();
+        for (ManifestFileMeta manifestMeta : nonEmptyManifestMetas) {
+            if (!containsPartitionStats(manifestMeta, partitionFieldCount)) {
+                return Collections.singletonList(nonEmptyManifestMetas);
+            }
+        }
+
+        RecordComparator partitionComparator = partitionComparator();
+        List<PartitionManifestRange> manifestRanges = new 
ArrayList<>(nonEmptyManifestMetas.size());
+        for (int i = 0; i < nonEmptyManifestMetas.size(); i++) {
+            ManifestFileMeta manifestMeta = nonEmptyManifestMetas.get(i);
+            manifestRanges.add(
+                    new PartitionManifestRange(
+                            manifestMeta,
+                            manifestMeta.partitionStats().minValues(),
+                            manifestMeta.partitionStats().maxValues(),
+                            i));
+        }
+        Collections.sort(
+                manifestRanges,
+                (left, right) -> {
+                    int result = 
partitionComparator.compare(left.minPartition, right.minPartition);
+                    if (result != 0) {
+                        return result;
+                    }
+                    return partitionComparator.compare(left.maxPartition, 
right.maxPartition);
+                });
+
+        List<List<PartitionManifestRange>> groupedManifestRanges = new 
ArrayList<>();
+        List<PartitionManifestRange> currentGroup = new ArrayList<>();
+        currentGroup.add(manifestRanges.get(0));
+        BinaryRow currentMaxPartition = manifestRanges.get(0).maxPartition;
+        for (int i = 1; i < manifestRanges.size(); i++) {
+            PartitionManifestRange current = manifestRanges.get(i);
+            if (partitionComparator.compare(current.minPartition, 
currentMaxPartition) <= 0) {
+                currentGroup.add(current);
+                if (partitionComparator.compare(current.maxPartition, 
currentMaxPartition) > 0) {
+                    currentMaxPartition = current.maxPartition;
+                }
+            } else {
+                groupedManifestRanges.add(currentGroup);
+                currentGroup = new ArrayList<>();
+                currentGroup.add(current);
+                currentMaxPartition = current.maxPartition;
+            }
+        }
+        groupedManifestRanges.add(currentGroup);
+
+        List<List<ManifestFileMeta>> groups = new ArrayList<>();
+        for (List<PartitionManifestRange> group : groupedManifestRanges) {
+            Collections.sort(group, Comparator.comparingInt(left -> 
left.originalIndex));
+            List<ManifestFileMeta> manifestGroup = new 
ArrayList<>(group.size());
+            for (PartitionManifestRange range : group) {
+                manifestGroup.add(range.manifest);
+            }
+            groups.add(manifestGroup);
+        }
+        return groups;
+    }
+
+    private boolean skipManifestGroupByPartitionFilter(List<ManifestFileMeta> 
manifestGroup) {
+        if (!partitionFilterEnabled()) {
+            return false;
+        }
+
+        int partitionFieldCount = 
table.schema().logicalPartitionType().getFieldCount();
+        for (ManifestFileMeta manifestMeta : manifestGroup) {
+            if (!containsPartitionStats(manifestMeta, partitionFieldCount)) {
+                return false;
+            }
+
+            SimpleStats partitionStats = manifestMeta.partitionStats();
+            if (partitionPredicate.test(
+                    manifestMeta.numAddedFiles() + 
manifestMeta.numDeletedFiles(),
+                    partitionStats.minValues(),
+                    partitionStats.maxValues(),
+                    partitionStats.nullCounts())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean containsPartitionStats(ManifestFileMeta manifestMeta, int 
partitionFieldCount) {
+        SimpleStats partitionStats = manifestMeta.partitionStats();
+        return partitionStats != null
+                && partitionStats.minValues().getFieldCount() == 
partitionFieldCount
+                && partitionStats.maxValues().getFieldCount() == 
partitionFieldCount
+                && partitionStats.nullCounts().size() == partitionFieldCount;
+    }
+
+    private CurrentManifest currentManifest(
+            List<ManifestFileMeta> manifestMetas, ManifestFile manifestFile) {
+        Set<FileEntry.Identifier> deletedIdentifiers =
+                deletedIdentifiers(manifestFile, manifestMetas);
+
+        List<SourcedManifestEntry> currentEntries = new ArrayList<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            if (manifestMeta.numAddedFiles() <= 0) {
+                continue;
+            }
+            List<ManifestEntry> entries =
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize());
+            for (ManifestEntry entry : entries) {
+                if (entry.kind() == FileKind.ADD
+                        && partitionIncluded(entry.partition())
+                        && !deletedIdentifiers.contains(entry.identifier())) {
+                    currentEntries.add(new SourcedManifestEntry(manifestMeta, 
entry));
+                }
+            }
+        }
+        return new CurrentManifest(manifestMetas, currentEntries);
+    }
+
+    private Set<FileEntry.Identifier> deletedIdentifiers(
+            ManifestFile manifestFile, List<ManifestFileMeta> manifestMetas) {
+        Set<FileEntry.Identifier> deletedIdentifiers = new HashSet<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            if (manifestMeta.numDeletedFiles() <= 0) {
+                continue;
+            }
+            List<ManifestEntry> entries =
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize());
+            for (ManifestEntry entry : entries) {
+                if (entry.kind() == FileKind.DELETE && 
partitionIncluded(entry.partition())) {
+                    deletedIdentifiers.add(entry.identifier());
+                }
+            }
+        }
+        return deletedIdentifiers;
+    }
+
+    private boolean partitionIncluded(BinaryRow partition) {
+        return !partitionFilterEnabled() || partitionPredicate.test(partition);
+    }
+
+    private boolean partitionFilterEnabled() {
+        return partitionPredicate != null;
+    }
+
+    private Pair<String, Long> writeBaseManifestList(
+            List<ManifestFileMeta> manifestMetas,
+            Map<String, List<ManifestFileMeta>> rewrittenManifestMetas,
+            ManifestList manifestList) {
+        List<ManifestFileMeta> baseManifestMetas = new ArrayList<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            List<ManifestFileMeta> replacement =
+                    rewrittenManifestMetas.get(manifestMeta.fileName());
+            if (replacement == null) {
+                baseManifestMetas.add(manifestMeta);
+            } else {
+                baseManifestMetas.addAll(replacement);
+            }
+        }
+        return manifestList.write(baseManifestMetas);
+    }
+
+    private Map<String, List<ManifestFileMeta>> writeManifestReplacements(
+            CurrentManifest currentManifest,
+            Assignment assignment,
+            Set<BinaryRow> partitionsToReassign,
+            ManifestFile manifestFile) {
+        Set<String> manifestsToRewrite =
+                manifestsToRewrite(currentManifest.currentEntries, 
partitionsToReassign);
+        Map<FileEntry.Identifier, ManifestEntry> reassignedEntries =
+                entriesByIdentifier(assignment.entries);
+
+        Map<String, List<ManifestFileMeta>> rewrittenManifestMetas = new 
HashMap<>();
+        for (ManifestFileMeta manifestMeta : currentManifest.manifestMetas) {
+            if (!manifestsToRewrite.contains(manifestMeta.fileName())) {
+                continue;
+            }
+
+            List<ManifestEntry> rewrittenEntries = new ArrayList<>();
+            List<ManifestEntry> entries =
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize());
+            for (ManifestEntry entry : entries) {
+                if (entry.kind() == FileKind.ADD) {
+                    ManifestEntry reassignedEntry = 
reassignedEntries.get(entry.identifier());
+                    if (reassignedEntry != null) {
+                        entry = reassignedEntry;
+                    }
+                }
+                rewrittenEntries.add(entry);
+            }
+            rewrittenManifestMetas.put(
+                    manifestMeta.fileName(), 
manifestFile.write(rewrittenEntries));
+        }
+        return rewrittenManifestMetas;
+    }
+
+    private Set<String> manifestsToRewrite(
+            List<SourcedManifestEntry> currentEntries, Set<BinaryRow> 
partitionsToReassign) {
+        Set<String> manifestsToRewrite = new HashSet<>();
+        for (SourcedManifestEntry currentEntry : currentEntries) {
+            if (partitionsToReassign.contains(currentEntry.entry.partition())) 
{
+                manifestsToRewrite.add(currentEntry.manifest.fileName());
+            }
+        }
+        return manifestsToRewrite;
+    }
+
+    private Map<FileEntry.Identifier, ManifestEntry> entriesByIdentifier(
+            List<ManifestEntry> entries) {
+        Map<FileEntry.Identifier, ManifestEntry> result = new HashMap<>();
+        for (ManifestEntry entry : entries) {
+            ManifestEntry previous = result.put(entry.identifier(), entry);
+            checkState(previous == null, "Duplicate current manifest entry for 
file %s.", entry);
+        }
+        return result;
+    }
+
+    private Map<BinaryRow, List<ManifestEntry>> 
entriesByPartition(List<ManifestEntry> entries) {
+        List<ManifestEntry> sorted = new ArrayList<>(entries);
+        Collections.sort(sorted, entryComparator());
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
LinkedHashMap<>();
+        for (ManifestEntry entry : sorted) {
+            List<String> writeCols = entry.file().writeCols();
+            checkState(
+                    writeCols == null || 
!writeCols.contains(SpecialFields.ROW_ID.name()),
+                    "Cannot reassign row IDs for file '%s' because it 
physically stores the row-id field.",
+                    entry.file().fileName());
+            checkState(
+                    entry.file().firstRowId() != null,
+                    "File '%s' in table '%s' does not have first row id.",
+                    entry.file().fileName(),
+                    table.name());
+            entriesByPartition
+                    .computeIfAbsent(entry.partition(), k -> new ArrayList<>())
+                    .add(entry);
+        }
+        return entriesByPartition;
+    }
+
+    private Set<BinaryRow> partitionsToReassign(
+            Map<BinaryRow, List<ManifestEntry>> entriesByPartition) {
+        Set<BinaryRow> partitionsToReassign = new HashSet<>();
+        for (Map.Entry<BinaryRow, List<ManifestEntry>> entry : 
entriesByPartition.entrySet()) {
+            if (!partitionRowIdsAreContiguous(entry.getValue())) {
+                partitionsToReassign.add(entry.getKey());
+            }
+        }
+        return partitionsToReassign;
+    }
+
+    private boolean partitionRowIdsAreContiguous(List<ManifestEntry> entries) {
+        List<Range> logicalRanges = logicalRanges(entries);
+        if (logicalRanges.size() <= 1) {
+            return true;
+        }
+
+        Collections.sort(
+                logicalRanges,
+                (left, right) -> {
+                    int result = Long.compare(left.from, right.from);
+                    return result == 0 ? Long.compare(left.to, right.to) : 
result;
+                });
+        long previousEnd = logicalRanges.get(0).to;
+        for (int i = 1; i < logicalRanges.size(); i++) {
+            Range current = logicalRanges.get(i);
+            if (current.from != previousEnd + 1) {
+                return false;
+            }
+            previousEnd = current.to;
+        }
+        return true;
+    }
+
+    private List<Range> logicalRanges(List<ManifestEntry> entries) {
+        RangeHelper<ManifestEntry> rangeHelper =
+                new RangeHelper<>(entry -> entry.file().nonNullRowIdRange());
+        List<List<ManifestEntry>> groups = 
rangeHelper.mergeOverlappingRanges(entries);
+        List<Range> logicalRanges = new ArrayList<>(groups.size());
+        for (List<ManifestEntry> group : groups) {
+            logicalRanges.add(oldLogicalRange(group));
+        }
+        return logicalRanges;
+    }
+
+    private Assignment assign(
+            Map<BinaryRow, List<ManifestEntry>> entriesByPartition,
+            Set<BinaryRow> partitionsToReassign,
+            long firstRowId) {
+        List<ManifestEntry> entries = new ArrayList<>();
+        Map<BinaryRow, RowRangeMappingIndex> rowIdMappings = new 
LinkedHashMap<>();
+        long nextRowId = firstRowId;
+        long logicalRowCount = 0;
+        long reassignedFileCount = 0;
+        for (Map.Entry<BinaryRow, List<ManifestEntry>> entry : 
entriesByPartition.entrySet()) {
+            if (partitionsToReassign.contains(entry.getKey())) {
+                long partitionFirstRowId = nextRowId;
+                PartitionAssignment partitionAssignment =
+                        assignPartition(entry.getValue(), nextRowId);
+                entries.addAll(partitionAssignment.entries);
+                rowIdMappings.put(entry.getKey(), 
partitionAssignment.rowIdMappings);
+                nextRowId = partitionAssignment.nextRowId;
+                logicalRowCount += nextRowId - partitionFirstRowId;
+                reassignedFileCount += partitionAssignment.entries.size();
+            }
+        }
+
+        return new Assignment(
+                entries, rowIdMappings, nextRowId, logicalRowCount, 
reassignedFileCount);
+    }
+
+    private PartitionAssignment assignPartition(List<ManifestEntry> entries, 
long firstRowId) {
+        RangeHelper<ManifestEntry> rangeHelper =
+                new RangeHelper<>(entry -> entry.file().nonNullRowIdRange());
+        List<List<ManifestEntry>> groups = 
rangeHelper.mergeOverlappingRanges(entries);
+
+        List<ManifestEntry> reassigned = new ArrayList<>(entries.size());
+        List<RowRangeMappingIndex.Mapping> mappings = new ArrayList<>();
+        long nextRowId = firstRowId;
+
+        for (List<ManifestEntry> group : groups) {
+            Collections.sort(group, entryComparatorWithoutPartition());
+            Range oldLogicalRange = oldLogicalRange(group);
+            mappings.add(
+                    RowRangeMappingIndex.mapping(
+                            oldLogicalRange.from, oldLogicalRange.to, 
nextRowId));
+
+            for (ManifestEntry entry : group) {
+                long oldFirstRowId = entry.file().nonNullFirstRowId();
+                long newFirstRowId = nextRowId + oldFirstRowId - 
oldLogicalRange.from;
+                reassigned.add(entry.assignFirstRowId(newFirstRowId));
+            }
+
+            nextRowId += oldLogicalRange.count();
+        }
+
+        return new PartitionAssignment(
+                reassigned, RowRangeMappingIndex.create(mappings), nextRowId);
+    }
+
+    private Range oldLogicalRange(List<ManifestEntry> group) {
+        List<ManifestEntry> dataFiles = new ArrayList<>();
+        for (ManifestEntry entry : group) {
+            if (!isSpecialFile(entry)) {
+                dataFiles.add(entry);
+            }
+        }
+
+        Range logicalRange;
+        if (dataFiles.isEmpty()) {
+            logicalRange = spanningRange(group);
+        } else {
+            logicalRange = dataFiles.get(0).file().nonNullRowIdRange();
+            for (ManifestEntry dataFile : dataFiles) {
+                Range current = dataFile.file().nonNullRowIdRange();
+                checkState(
+                        logicalRange.from == current.from && logicalRange.to 
== current.to,
+                        "Data files in one overlapping row-id group must have 
the same row-id range, but found %s and %s.",
+                        logicalRange,
+                        current);
+            }
+        }
+
+        for (ManifestEntry entry : group) {
+            Range range = entry.file().nonNullRowIdRange();
+            checkState(
+                    range.from >= logicalRange.from && range.to <= 
logicalRange.to,
+                    "File '%s' row-id range %s is outside logical row-id range 
%s.",
+                    entry.file().fileName(),
+                    range,
+                    logicalRange);
+        }
+        return logicalRange;
+    }
+
+    private Range spanningRange(List<ManifestEntry> group) {
+        long min = Long.MAX_VALUE;
+        long max = Long.MIN_VALUE;
+        for (ManifestEntry entry : group) {
+            Range range = entry.file().nonNullRowIdRange();
+            min = Math.min(min, range.from);
+            max = Math.max(max, range.to);
+        }
+        return new Range(min, max);
+    }
+
+    private RewrittenIndexManifest rewriteIndexManifest(
+            Snapshot latest, AssignmentPlan assignment) {
+        if (latest.indexManifest() == null) {
+            return new RewrittenIndexManifest(null, 0);
+        }
+
+        IndexManifestFile indexManifestFile = 
table.store().indexManifestFileFactory().create();
+        List<IndexManifestEntry> indexEntries = 
indexManifestFile.read(latest.indexManifest());
+        if (indexEntries.isEmpty()) {
+            return new RewrittenIndexManifest(null, 0);
+        }
+
+        List<IndexManifestEntry> rewritten = new 
ArrayList<>(indexEntries.size());
+        long globalIndexFileCount = 0;
+        for (IndexManifestEntry entry : indexEntries) {
+            checkState(
+                    entry.kind() == FileKind.ADD,
+                    "Index manifest '%s' contains non-current entry %s.",
+                    latest.indexManifest(),
+                    entry);
+
+            IndexFileMeta indexFile = entry.indexFile();
+            GlobalIndexMeta globalIndex = indexFile.globalIndexMeta();
+            RowRangeMappingIndex mappingIndex = 
assignment.rowIdMappings.get(entry.partition());
+            if (globalIndex == null || mappingIndex == null) {
+                rewritten.add(entry);
+                continue;
+            }
+
+            globalIndexFileCount++;
+            Range newRange = mappingIndex.map(globalIndex.rowRange());
+            GlobalIndexMeta newGlobalIndex =
+                    new GlobalIndexMeta(
+                            newRange.from,
+                            newRange.to,
+                            globalIndex.indexFieldId(),
+                            globalIndex.extraFieldIds(),
+                            globalIndex.indexMeta());
+            IndexFileMeta newIndexFile =
+                    new IndexFileMeta(
+                            indexFile.indexType(),
+                            indexFile.fileName(),
+                            indexFile.fileSize(),
+                            indexFile.rowCount(),
+                            indexFile.dvRanges(),
+                            indexFile.externalPath(),
+                            newGlobalIndex);
+            rewritten.add(
+                    new IndexManifestEntry(
+                            entry.kind(), entry.partition(), entry.bucket(), 
newIndexFile));
+        }
+
+        return new RewrittenIndexManifest(
+                indexManifestFile.writeWithoutRolling(rewritten), 
globalIndexFileCount);
+    }
+
+    private Comparator<ManifestEntry> entryComparator() {
+        RecordComparator partitionComparator = partitionComparator();
+        Comparator<ManifestEntry> withoutPartition = 
entryComparatorWithoutPartition();
+        return (left, right) -> {
+            int partitionCompare = 
partitionComparator.compare(left.partition(), right.partition());
+            if (partitionCompare != 0) {
+                return partitionCompare;
+            }
+            return withoutPartition.compare(left, right);
+        };
+    }
+
+    private RecordComparator partitionComparator() {
+        return CodeGenUtils.newRecordComparator(
+                table.schema().logicalPartitionType().getFieldTypes());
+    }
+
+    private Comparator<ManifestEntry> entryComparatorWithoutPartition() {
+        return (left, right) -> {
+            int result =
+                    Long.compare(left.file().nonNullFirstRowId(), 
right.file().nonNullFirstRowId());
+            if (result != 0) {
+                return result;
+            }
+            result = Integer.compare(fileOrder(left), fileOrder(right));
+            if (result != 0) {
+                return result;
+            }
+            result =
+                    Long.compare(right.file().maxSequenceNumber(), 
left.file().maxSequenceNumber());
+            if (result != 0) {
+                return result;
+            }
+            return left.file().fileName().compareTo(right.file().fileName());
+        };
+    }
+
+    private int fileOrder(ManifestEntry entry) {
+        if (isBlobFile(entry.file().fileName())) {
+            return 1;
+        }
+        if (isVectorStoreFile(entry.file().fileName())) {
+            return 2;
+        }
+        return 0;
+    }
+
+    private boolean isSpecialFile(ManifestEntry entry) {
+        return isBlobFile(entry.file().fileName()) || 
isVectorStoreFile(entry.file().fileName());
+    }
+
+    /** Result of row-id reassignment. */
+    public static class Result {
+        public final long previousSnapshotId;
+        public final long newSnapshotId;
+        public final long fileCount;
+        public final long rowCount;
+        public final long indexFileCount;
+        public final long firstAssignedRowId;
+        public final long nextRowId;
+        public final boolean reassigned;
+        @Nullable public final String skipReason;
+
+        public Result(
+                long previousSnapshotId,
+                long newSnapshotId,
+                long fileCount,
+                long rowCount,
+                long indexFileCount,
+                long firstAssignedRowId,
+                long nextRowId) {
+            this(
+                    previousSnapshotId,
+                    newSnapshotId,
+                    fileCount,
+                    rowCount,
+                    indexFileCount,
+                    firstAssignedRowId,
+                    nextRowId,
+                    true,
+                    null);
+        }
+
+        public Result(
+                long previousSnapshotId,
+                long newSnapshotId,
+                long fileCount,
+                long rowCount,
+                long indexFileCount,
+                long firstAssignedRowId,
+                long nextRowId,
+                boolean reassigned) {
+            this(
+                    previousSnapshotId,
+                    newSnapshotId,
+                    fileCount,
+                    rowCount,
+                    indexFileCount,
+                    firstAssignedRowId,
+                    nextRowId,
+                    reassigned,
+                    null);
+        }
+
+        public Result(
+                long previousSnapshotId,
+                long newSnapshotId,
+                long fileCount,
+                long rowCount,
+                long indexFileCount,
+                long firstAssignedRowId,
+                long nextRowId,
+                boolean reassigned,
+                @Nullable String skipReason) {
+            this.previousSnapshotId = previousSnapshotId;
+            this.newSnapshotId = newSnapshotId;
+            this.fileCount = fileCount;
+            this.rowCount = rowCount;
+            this.indexFileCount = indexFileCount;
+            this.firstAssignedRowId = firstAssignedRowId;
+            this.nextRowId = nextRowId;
+            this.reassigned = reassigned;
+            this.skipReason = skipReason;
+        }
+
+        private static Result skipped(long snapshotId, long nextRowId, String 
reason) {
+            return new Result(snapshotId, snapshotId, 0, 0, 0, nextRowId, 
nextRowId, false, reason);
+        }
+    }
+
+    private static class RewrittenIndexManifest {
+        @Nullable private final String indexManifest;
+        private final long indexFileCount;
+
+        private RewrittenIndexManifest(@Nullable String indexManifest, long 
indexFileCount) {
+            this.indexManifest = indexManifest;
+            this.indexFileCount = indexFileCount;
+        }
+    }
+
+    private static class Assignment {
+        private final List<ManifestEntry> entries;
+        private final Map<BinaryRow, RowRangeMappingIndex> rowIdMappings;
+        private final long nextRowId;
+        private final long logicalRowCount;
+        private final long reassignedFileCount;
+
+        private Assignment(
+                List<ManifestEntry> entries,
+                Map<BinaryRow, RowRangeMappingIndex> rowIdMappings,
+                long nextRowId,
+                long logicalRowCount,
+                long reassignedFileCount) {
+            this.entries = entries;
+            this.rowIdMappings = rowIdMappings;
+            this.nextRowId = nextRowId;
+            this.logicalRowCount = logicalRowCount;
+            this.reassignedFileCount = reassignedFileCount;
+        }
+    }
+
+    private static class AssignmentPlan {
+        private final Map<String, List<ManifestFileMeta>> 
rewrittenManifestMetas;
+        private final Map<BinaryRow, RowRangeMappingIndex> rowIdMappings;
+        private final long nextRowId;
+        private final long logicalRowCount;
+        private final long reassignedFileCount;
+        private final boolean hasCurrentFiles;
+
+        private AssignmentPlan(
+                Map<String, List<ManifestFileMeta>> rewrittenManifestMetas,
+                Map<BinaryRow, RowRangeMappingIndex> rowIdMappings,
+                long nextRowId,
+                long logicalRowCount,
+                long reassignedFileCount,
+                boolean hasCurrentFiles) {
+            this.rewrittenManifestMetas = rewrittenManifestMetas;
+            this.rowIdMappings = rowIdMappings;
+            this.nextRowId = nextRowId;
+            this.logicalRowCount = logicalRowCount;
+            this.reassignedFileCount = reassignedFileCount;
+            this.hasCurrentFiles = hasCurrentFiles;
+        }
+    }
+
+    private static class CurrentManifest {
+        private final List<ManifestFileMeta> manifestMetas;
+        private final List<SourcedManifestEntry> currentEntries;
+
+        private CurrentManifest(
+                List<ManifestFileMeta> manifestMetas, 
List<SourcedManifestEntry> currentEntries) {
+            this.manifestMetas = manifestMetas;
+            this.currentEntries = currentEntries;
+        }
+
+        private List<ManifestEntry> entries() {
+            List<ManifestEntry> entries = new 
ArrayList<>(currentEntries.size());
+            for (SourcedManifestEntry currentEntry : currentEntries) {
+                entries.add(currentEntry.entry);
+            }
+            return entries;
+        }
+    }
+
+    private static class SourcedManifestEntry {
+        private final ManifestFileMeta manifest;
+        private final ManifestEntry entry;
+
+        private SourcedManifestEntry(ManifestFileMeta manifest, ManifestEntry 
entry) {
+            this.manifest = manifest;
+            this.entry = entry;
+        }
+    }
+
+    private static class PartitionManifestRange {
+        private final ManifestFileMeta manifest;
+        private final BinaryRow minPartition;
+        private final BinaryRow maxPartition;
+        private final int originalIndex;
+
+        private PartitionManifestRange(
+                ManifestFileMeta manifest,
+                BinaryRow minPartition,
+                BinaryRow maxPartition,
+                int originalIndex) {
+            this.manifest = manifest;
+            this.minPartition = minPartition;
+            this.maxPartition = maxPartition;
+            this.originalIndex = originalIndex;
+        }
+    }
+
+    private static class PartitionAssignment {
+        private final List<ManifestEntry> entries;
+        private final RowRangeMappingIndex rowIdMappings;
+        private final long nextRowId;
+
+        private PartitionAssignment(
+                List<ManifestEntry> entries, RowRangeMappingIndex 
rowIdMappings, long nextRowId) {
+            this.entries = entries;
+            this.rowIdMappings = rowIdMappings;
+            this.nextRowId = nextRowId;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
new file mode 100644
index 0000000000..d234c6f736
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Index for row-range mappings. */
+final class RowRangeMappingIndex {
+
+    private final List<Mapping> mappings;
+    private final long[] oldEnds;
+
+    private RowRangeMappingIndex(List<Mapping> mappings) {
+        this.mappings = mappings;
+        this.oldEnds = new long[mappings.size()];
+        for (int i = 0; i < mappings.size(); i++) {
+            Mapping mapping = mappings.get(i);
+            oldEnds[i] = mapping.oldEnd;
+        }
+    }
+
+    static RowRangeMappingIndex create(List<Mapping> mappings) {
+        checkArgument(mappings != null, "Row range mappings cannot be null.");
+        checkArgument(!mappings.isEmpty(), "Row range mappings cannot be 
empty.");
+
+        List<Mapping> sorted = new ArrayList<>(mappings);
+        Collections.sort(sorted, Comparator.comparingLong(mapping -> 
mapping.oldStart));
+        Mapping previous = null;
+        for (Mapping mapping : sorted) {
+            checkArgument(
+                    mapping.oldStart <= mapping.oldEnd,
+                    "Invalid old row range [%s, %s].",
+                    mapping.oldStart,
+                    mapping.oldEnd);
+            if (previous != null) {
+                checkArgument(
+                        previous.oldEnd < mapping.oldStart,
+                        "Old row range mappings cannot overlap.");
+            }
+            previous = mapping;
+        }
+        return new RowRangeMappingIndex(Collections.unmodifiableList(sorted));
+    }
+
+    static Mapping mapping(long oldStart, long oldEnd, long newStart) {
+        return new Mapping(oldStart, oldEnd, newStart);
+    }
+
+    Range map(Range oldRange) {
+        checkArgument(oldRange != null, "Old row range cannot be null.");
+        checkArgument(oldRange.from <= oldRange.to, "Invalid old row range 
%s.", oldRange);
+
+        long cursor = oldRange.from;
+        Long newFrom = null;
+        long newTo = Long.MIN_VALUE;
+
+        for (int i = lowerBound(oldEnds, cursor); i < mappings.size(); i++) {
+            Mapping mapping = mappings.get(i);
+            if (mapping.oldStart > cursor) {
+                break;
+            }
+
+            long segmentTo = Math.min(mapping.oldEnd, oldRange.to);
+            long segmentNewFrom = mapping.newStart + cursor - mapping.oldStart;
+            long segmentNewTo = mapping.newStart + segmentTo - 
mapping.oldStart;
+
+            if (newFrom == null) {
+                newFrom = segmentNewFrom;
+            } else {
+                checkState(
+                        newTo + 1 == segmentNewFrom,
+                        "Global index row range %s maps to non-contiguous new 
row range.",
+                        oldRange);
+            }
+            newTo = segmentNewTo;
+            cursor = segmentTo + 1;
+            if (cursor > oldRange.to) {
+                break;
+            }
+        }
+
+        checkState(
+                cursor > oldRange.to && newFrom != null,
+                "Global index row range %s is not fully covered by data file 
row-id mappings.",
+                oldRange);
+        return new Range(newFrom, newTo);
+    }
+
+    private static int lowerBound(long[] sorted, long target) {
+        int left = 0;
+        int right = sorted.length;
+        while (left < right) {
+            int mid = left + (right - left) / 2;
+            if (sorted[mid] < target) {
+                left = mid + 1;
+            } else {
+                right = mid;
+            }
+        }
+        return left;
+    }
+
+    static final class Mapping {
+        private final long oldStart;
+        private final long oldEnd;
+        private final long newStart;
+
+        private Mapping(long oldStart, long oldEnd, long newStart) {
+            this.oldStart = oldStart;
+            this.oldEnd = oldEnd;
+            this.newStart = newStart;
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
new file mode 100644
index 0000000000..2cacf4722e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java
@@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
+import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionRowIdReassigner}. */
+public class DataEvolutionRowIdReassignerTest extends TableTestBase {
+
+    private static final int LARGE_ENTRY_COUNT = 10_000;
+    private static final int LARGE_MANIFEST_FILE_COUNT = 20;
+    private static final int ONE_MILLION_ENTRY_COUNT = 1_000_000;
+    private static final int ONE_MILLION_MANIFEST_FILE_COUNT = 200;
+    private static final long LARGE_ENTRY_ROW_COUNT = 2L;
+    private static final long RANDOM_PARTITION_SEED = 20260519L;
+    private static final String LARGE_FILE_PREFIX = "large-partition-";
+    private static final String LARGE_FILE_SUFFIX = ".parquet";
+    private static final int LARGE_PARTITION_ID_WIDTH = 7;
+
+    @Override
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("pt", DataTypes.STRING());
+        schemaBuilder.column("id", DataTypes.INT());
+        schemaBuilder.column("payload", DataTypes.STRING());
+        schemaBuilder.partitionKeys("pt");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.GLOBAL_INDEX_ENABLED.key(), "true");
+        
schemaBuilder.option(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key(), 
"1");
+        return schemaBuilder.build();
+    }
+
+    private Schema unpartitionedSchema() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("id", DataTypes.INT());
+        schemaBuilder.column("payload", DataTypes.STRING());
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        return schemaBuilder.build();
+    }
+
+    @Test
+    public void testReassignRowIdsByPartition() throws Exception {
+        FileStoreTable table = createTableWithInterleavedPartitions();
+
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-row-id");
+
+        assertThat(result.reassigned).isTrue();
+        assertThat(result.previousSnapshotId).isEqualTo(5L);
+        assertThat(result.newSnapshotId).isEqualTo(6L);
+        assertThat(result.firstAssignedRowId).isEqualTo(5L);
+        assertThat(result.nextRowId).isEqualTo(10L);
+        assertThat(result.fileCount).isEqualTo(5L);
+        assertThat(result.rowCount).isEqualTo(5L);
+        assertThat(result.indexFileCount).isEqualTo(0L);
+
+        Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
+        assertThat(rowIdsByPartition).hasSize(2);
+        assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L, 
6L, 7L));
+        assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L, 
9L));
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(10L);
+    }
+
+    @Test
+    public void testReassignMultiRowFilesByPartition() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeRows(table, "a", 0, 1);
+        writeRows(table, "b", 2, 3);
+        writeRows(table, "a", 4, 5);
+
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(6L);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-multi-row");
+
+        assertThat(result.firstAssignedRowId).isEqualTo(6L);
+        assertThat(result.nextRowId).isEqualTo(10L);
+        assertThat(result.fileCount).isEqualTo(2L);
+        assertThat(result.rowCount).isEqualTo(4L);
+        Map<String, List<Long>> rowIdsByPartition = 
expandedRowIdsByPartition(table);
+        assertThat(rowIdsByPartition).hasSize(2);
+        assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(6L, 
7L, 8L, 9L));
+        assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(2L, 
3L));
+    }
+
+    @Test
+    public void testReassignOnlyOverlappingPartitions() throws Exception {
+        FileStoreTable table = createTableWithPartiallyOverlappedPartitions();
+        createBTreeIndex(table);
+
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-partial-row-id");
+
+        assertThat(result.firstAssignedRowId).isEqualTo(5L);
+        assertThat(result.nextRowId).isEqualTo(7L);
+        assertThat(result.fileCount).isEqualTo(2L);
+        assertThat(result.rowCount).isEqualTo(2L);
+        assertThat(result.indexFileCount).isEqualTo(2L);
+
+        Map<String, List<Long>> rowIdsByPartition = 
expandedRowIdsByPartition(table);
+        assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L, 
6L));
+        assertThat(rowIdsByPartition).containsEntry("pt=b/", 
Collections.singletonList(3L));
+        assertThat(rowIdsByPartition).containsEntry("pt=c/", Arrays.asList(0L, 
1L));
+        assertThat(globalIndexRanges(table))
+                .containsExactly(
+                        new Range(0, 1),
+                        new Range(0, 1),
+                        new Range(3, 3),
+                        new Range(5, 5),
+                        new Range(6, 6));
+
+        Predicate oldPartitionPredicate =
+                new 
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 1);
+        assertThat(readPayloads(table, 
oldPartitionPredicate)).containsExactly("v1");
+        Predicate reassignedPartitionPredicate =
+                new 
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
+        assertThat(readPayloads(table, 
reassignedPartitionPredicate)).containsExactly("v4");
+    }
+
+    @Test
+    public void testReassignOnlyOuterPartitionWhenInnerRangeCanStay() throws 
Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeOneRow(table, "a", 0);
+        writeOneRow(table, "b", 1);
+        writeOneRow(table, "c", 2);
+        writeOneRow(table, "c", 3);
+        writeOneRow(table, "d", 4);
+        writeOneRow(table, "d", 5);
+        writeOneRow(table, "b", 6);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-only-b-row-id");
+
+        assertThat(result.firstAssignedRowId).isEqualTo(7L);
+        assertThat(result.nextRowId).isEqualTo(9L);
+        assertThat(result.fileCount).isEqualTo(2L);
+        assertThat(result.rowCount).isEqualTo(2L);
+
+        assertThat(rowIdsByPartition(table))
+                .containsEntry("pt=a/", Collections.singletonList(0L))
+                .containsEntry("pt=b/", Arrays.asList(7L, 8L))
+                .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+                .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+    }
+
+    @Test
+    public void testPartitionFilterOnlyReassignsSelectedPartitions() throws 
Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeOneRow(table, "a", 0);
+        writeOneRow(table, "b", 1);
+        writeOneRow(table, "c", 2);
+        writeOneRow(table, "c", 3);
+        writeOneRow(table, "d", 4);
+        writeOneRow(table, "d", 5);
+        writeOneRow(table, "b", 6);
+
+        DataEvolutionRowIdReassigner.Result cResult =
+                new DataEvolutionRowIdReassigner(table, 
partitionPredicate(table, "c"))
+                        .reassign("test-filter-c-row-id");
+
+        assertThat(cResult.reassigned).isFalse();
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(7L);
+        assertThat(rowIdsByPartition(table))
+                .containsEntry("pt=a/", Collections.singletonList(0L))
+                .containsEntry("pt=b/", Arrays.asList(1L, 6L))
+                .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+                .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+
+        DataEvolutionRowIdReassigner.Result bResult =
+                new DataEvolutionRowIdReassigner(table, 
partitionPredicate(table, "b"))
+                        .reassign("test-filter-b-row-id");
+
+        assertThat(bResult.reassigned).isTrue();
+        assertThat(bResult.firstAssignedRowId).isEqualTo(7L);
+        assertThat(bResult.nextRowId).isEqualTo(9L);
+        assertThat(bResult.fileCount).isEqualTo(2L);
+        assertThat(bResult.rowCount).isEqualTo(2L);
+        assertThat(rowIdsByPartition(table))
+                .containsEntry("pt=a/", Collections.singletonList(0L))
+                .containsEntry("pt=b/", Arrays.asList(7L, 8L))
+                .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+                .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+    }
+
+    @Test
+    public void testReassignReusesUnaffectedManifestFiles() throws Exception {
+        FileStoreTable table = createTableWithPartiallyOverlappedPartitions();
+        Map<String, Set<String>> partitionsByManifest = 
currentPartitionsByManifest(table);
+        List<String> unaffectedManifests = new ArrayList<>();
+        List<String> affectedManifests = new ArrayList<>();
+        for (Map.Entry<String, Set<String>> entry : 
partitionsByManifest.entrySet()) {
+            if (!entry.getValue().contains("pt=a/")) {
+                unaffectedManifests.add(entry.getKey());
+            }
+            if (entry.getValue().contains("pt=a/")) {
+                affectedManifests.add(entry.getKey());
+            }
+        }
+        assertThat(unaffectedManifests).isNotEmpty();
+        assertThat(affectedManifests).isNotEmpty();
+
+        new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-reuse-manifest");
+
+        List<String> afterManifestFiles = dataManifestFileNames(table);
+        assertThat(afterManifestFiles).containsAll(unaffectedManifests);
+        
assertThat(afterManifestFiles).doesNotContainAnyElementsOf(affectedManifests);
+    }
+
+    @Test
+    public void testSkipWhenPartitionRowIdsAreContiguous() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeRows(table, "a", 0, 1);
+        writeRows(table, "a", 2);
+        writeRows(table, "b", 3, 4);
+
+        
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(3L);
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-skip-row-id");
+
+        assertThat(result.reassigned).isFalse();
+        assertThat(result.skipReason).isEqualTo("partition row IDs are already 
contiguous");
+        assertThat(result.previousSnapshotId).isEqualTo(3L);
+        assertThat(result.newSnapshotId).isEqualTo(3L);
+        assertThat(result.firstAssignedRowId).isEqualTo(5L);
+        assertThat(result.nextRowId).isEqualTo(5L);
+        assertThat(result.fileCount).isEqualTo(0L);
+        assertThat(result.rowCount).isEqualTo(0L);
+        assertThat(result.indexFileCount).isEqualTo(0L);
+        
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(3L);
+        assertThat(expandedRowIdsByPartition(table))
+                .containsEntry("pt=a/", Arrays.asList(0L, 1L, 2L))
+                .containsEntry("pt=b/", Arrays.asList(3L, 4L));
+    }
+
+    @Test
+    public void testSkipUnpartitionedTable() throws Exception {
+        Identifier identifier = Identifier.create(database, 
"unpartitioned_table");
+        catalog.createTable(identifier, unpartitionedSchema(), false);
+        FileStoreTable table = getTable(identifier);
+        writeUnpartitionedRows(table, 0, 1, 2);
+
+        
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(1L);
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(3L);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-skip-unpartitioned-row-id");
+
+        assertThat(result.reassigned).isFalse();
+        assertThat(result.skipReason).isEqualTo("table is not partitioned");
+        assertThat(result.previousSnapshotId).isEqualTo(1L);
+        assertThat(result.newSnapshotId).isEqualTo(1L);
+        assertThat(result.firstAssignedRowId).isEqualTo(3L);
+        assertThat(result.nextRowId).isEqualTo(3L);
+        assertThat(result.fileCount).isEqualTo(0L);
+        assertThat(result.rowCount).isEqualTo(0L);
+        assertThat(result.indexFileCount).isEqualTo(0L);
+        
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(1L);
+    }
+
+    @Test
+    public void testReassignGlobalIndexRowRanges() throws Exception {
+        FileStoreTable table = createTableWithInterleavedPartitions();
+        createBTreeIndex(table);
+
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(5L);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-row-id-index");
+
+        assertThat(result.firstAssignedRowId).isEqualTo(5L);
+        assertThat(result.nextRowId).isEqualTo(10L);
+        assertThat(result.indexFileCount).isEqualTo(5L);
+
+        List<Range> indexRanges = globalIndexRanges(table);
+        assertThat(indexRanges)
+                .containsExactly(
+                        new Range(5, 5),
+                        new Range(6, 6),
+                        new Range(7, 7),
+                        new Range(8, 8),
+                        new Range(9, 9));
+
+        Predicate predicate =
+                new 
PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
+        assertThat(readPayloads(table, predicate)).containsExactly("v4");
+
+        DataEvolutionRowIdReassigner.Result secondResult =
+                new 
DataEvolutionRowIdReassigner(table).reassign("test-reassign-row-id-index-2");
+
+        assertThat(secondResult.reassigned).isFalse();
+        assertThat(secondResult.previousSnapshotId).isEqualTo(7L);
+        assertThat(secondResult.newSnapshotId).isEqualTo(7L);
+        assertThat(secondResult.firstAssignedRowId).isEqualTo(10L);
+        assertThat(secondResult.nextRowId).isEqualTo(10L);
+        assertThat(secondResult.fileCount).isEqualTo(0L);
+        assertThat(secondResult.rowCount).isEqualTo(0L);
+        assertThat(secondResult.indexFileCount).isEqualTo(0L);
+        assertThat(globalIndexRanges(table))
+                .containsExactly(
+                        new Range(5, 5),
+                        new Range(6, 6),
+                        new Range(7, 7),
+                        new Range(8, 8),
+                        new Range(9, 9));
+        assertThat(readPayloads(table, predicate)).containsExactly("v4");
+    }
+
+    @Test
+    public void testReassignManyOutOfOrderPartitionEntries() throws Exception {
+        verifyReassignOutOfOrderPartitionEntries(LARGE_ENTRY_COUNT, 
LARGE_MANIFEST_FILE_COUNT);
+    }
+
+    @Disabled("Writes one million manifest entries; run manually for large 
metadata stress tests.")
+    @Test
+    public void testReassignOneMillionOutOfOrderPartitionEntries() throws 
Exception {
+        verifyReassignOutOfOrderPartitionEntries(
+                ONE_MILLION_ENTRY_COUNT, ONE_MILLION_MANIFEST_FILE_COUNT);
+    }
+
+    private FileStoreTable createTableWithInterleavedPartitions() throws 
Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeOneRow(table, "a", 0);
+        writeOneRow(table, "b", 1);
+        writeOneRow(table, "a", 2);
+        writeOneRow(table, "b", 3);
+        writeOneRow(table, "a", 4);
+        return table;
+    }
+
+    private FileStoreTable createTableWithPartiallyOverlappedPartitions() 
throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeRows(table, "c", 0, 1);
+        writeOneRow(table, "a", 2);
+        writeOneRow(table, "b", 3);
+        writeOneRow(table, "a", 4);
+        return table;
+    }
+
+    private void verifyReassignOutOfOrderPartitionEntries(int entryCount, int 
manifestFileCount)
+            throws Exception {
+        assertThat(entryCount % 2).isEqualTo(0);
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeOneRow(table, "seed", -1);
+
+        long oldNextRowId = writeOutOfOrderManifestSnapshot(table, entryCount, 
manifestFileCount);
+        Snapshot before = table.snapshotManager().latestSnapshot();
+        List<String> beforeManifestFiles = dataManifestFileNames(table);
+        assertThat(beforeManifestFiles).hasSize(manifestFileCount);
+        assertThat(before.nextRowId()).isEqualTo(oldNextRowId);
+        
assertThat(before.totalRecordCount()).isEqualTo(logicalRowCount(entryCount));
+        ExpectedLargeManifestAssignment expectedAssignment =
+                expectedLargeManifestAssignment(table, before, entryCount);
+
+        DataEvolutionRowIdReassigner.Result result =
+                new DataEvolutionRowIdReassigner(table)
+                        .reassign("test-reassign-large-out-of-order-row-id");
+
+        assertThat(result.reassigned).isTrue();
+        assertThat(result.previousSnapshotId).isEqualTo(before.id());
+        assertThat(result.newSnapshotId).isEqualTo(before.id() + 1);
+        assertThat(result.firstAssignedRowId).isEqualTo(oldNextRowId);
+        assertThat(result.nextRowId)
+                .isEqualTo(oldNextRowId + 
expectedAssignment.reassignedRowCount);
+        
assertThat(result.fileCount).isEqualTo(expectedAssignment.reassignedEntryCount);
+        
assertThat(result.rowCount).isEqualTo(expectedAssignment.reassignedRowCount);
+        assertThat(result.indexFileCount).isEqualTo(0L);
+
+        assertReassignedOutOfOrderPartitionEntries(
+                table, entryCount, oldNextRowId, expectedAssignment, 
beforeManifestFiles);
+    }
+
+    private long writeOutOfOrderManifestSnapshot(
+            FileStoreTable table, int entryCount, int manifestFileCount) 
throws Exception {
+        Snapshot latest = table.snapshotManager().latestSnapshot();
+        ManifestFile manifestFile = 
table.store().manifestFileFactory().create();
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        InternalRowSerializer partitionSerializer =
+                new 
InternalRowSerializer(table.schema().logicalPartitionType());
+
+        int partitionCount = largePartitionCount(entryCount);
+        assertThat(entryCount).isGreaterThanOrEqualTo(manifestFileCount);
+        List<ManifestFileMeta> manifestMetas = new ArrayList<>();
+        Random random = new Random(RANDOM_PARTITION_SEED);
+        int entryStart = 0;
+        for (int manifestIndex = 0; manifestIndex < manifestFileCount; 
manifestIndex++) {
+            int currentEntryCount =
+                    entryCount / manifestFileCount
+                            + (manifestIndex < entryCount % manifestFileCount 
? 1 : 0);
+            List<ManifestEntry> entries = new ArrayList<>(currentEntryCount);
+            for (int entryOffset = 0; entryOffset < currentEntryCount; 
entryOffset++) {
+                int entryIndex = entryStart + entryOffset;
+                int partitionId = random.nextInt(partitionCount);
+                entries.add(
+                        largeManifestEntry(
+                                partitionSerializer,
+                                partitionId,
+                                entryIndex,
+                                logicalRowCount(entryIndex)));
+            }
+            manifestMetas.addAll(manifestFile.write(entries));
+            entryStart += currentEntryCount;
+        }
+        assertThat(entryStart).isEqualTo(entryCount);
+        assertThat(manifestMetas).hasSize(manifestFileCount);
+
+        Pair<String, Long> baseManifestList = 
manifestList.write(manifestMetas);
+        Pair<String, Long> deltaManifestList = 
manifestList.write(Collections.emptyList());
+        long oldNextRowId = logicalRowCount(entryCount) + 1L;
+        try (FileStoreCommitImpl commit =
+                (FileStoreCommitImpl)
+                        
table.store().newCommit("test-large-out-of-order-source", table)) {
+            assertThat(
+                            commit.replaceManifestList(
+                                    latest,
+                                    logicalRowCount(entryCount),
+                                    baseManifestList,
+                                    deltaManifestList,
+                                    latest.indexManifest(),
+                                    oldNextRowId))
+                    .isTrue();
+        }
+        return oldNextRowId;
+    }
+
+    private ManifestEntry largeManifestEntry(
+            InternalRowSerializer partitionSerializer,
+            int partitionId,
+            int entryOrdinal,
+            long firstRowId) {
+        BinaryRow partition =
+                partitionSerializer
+                        .toBinaryRow(
+                                
GenericRow.of(BinaryString.fromString(largePartition(partitionId))))
+                        .copy();
+        long sequenceNumber = sequenceNumber(partitionId);
+        DataFileMeta file =
+                DataFileMeta.forAppend(
+                        largeFileName(partitionId, entryOrdinal),
+                        1L,
+                        LARGE_ENTRY_ROW_COUNT,
+                        SimpleStats.EMPTY_STATS,
+                        sequenceNumber,
+                        sequenceNumber,
+                        0L,
+                        Collections.emptyList(),
+                        null,
+                        FileSource.APPEND,
+                        null,
+                        null,
+                        firstRowId,
+                        null);
+        return ManifestEntry.create(FileKind.ADD, partition, 0, 1, file);
+    }
+
+    private void assertReassignedOutOfOrderPartitionEntries(
+            FileStoreTable table,
+            int entryCount,
+            long firstAssignedRowId,
+            ExpectedLargeManifestAssignment expectedAssignment,
+            List<String> beforeManifestFiles) {
+        List<String> afterManifestFiles = dataManifestFileNames(table);
+        assertThat(afterManifestFiles).hasSize(beforeManifestFiles.size());
+        assertThat(afterManifestFiles)
+                
.doesNotContainAnyElementsOf(expectedAssignment.rewrittenManifestFiles);
+        List<String> reusedManifestFiles = new 
ArrayList<>(beforeManifestFiles);
+        
reusedManifestFiles.removeAll(expectedAssignment.rewrittenManifestFiles);
+        assertThat(afterManifestFiles).containsAll(reusedManifestFiles);
+
+        List<ManifestEntry> entries =
+                table.store()
+                        .newScan()
+                        .withSnapshot(table.snapshotManager().latestSnapshot())
+                        .plan()
+                        .files();
+        assertThat(entries).hasSize(entryCount);
+
+        int partitionCount = largePartitionCount(entryCount);
+        int[] entriesByPartition = new int[partitionCount];
+        long[] minRowIdByPartition = new long[partitionCount];
+        long[] maxRowIdByPartition = new long[partitionCount];
+        Arrays.fill(minRowIdByPartition, Long.MAX_VALUE);
+        Arrays.fill(maxRowIdByPartition, Long.MIN_VALUE);
+        List<Range> ranges = new ArrayList<>(entryCount);
+        long reassignedEntryCount = 0;
+        long reassignedRowCount = 0;
+        for (ManifestEntry entry : entries) {
+            Range range = entry.file().nonNullRowIdRange();
+            ranges.add(range);
+            assertThat(range.count()).isEqualTo(LARGE_ENTRY_ROW_COUNT);
+            if (range.from >= firstAssignedRowId) {
+                reassignedEntryCount++;
+                reassignedRowCount += range.count();
+            }
+
+            int partitionId = 
partitionIdFromLargeFileName(entry.file().fileName());
+            entriesByPartition[partitionId]++;
+            minRowIdByPartition[partitionId] =
+                    Math.min(minRowIdByPartition[partitionId], range.from);
+            maxRowIdByPartition[partitionId] = 
Math.max(maxRowIdByPartition[partitionId], range.to);
+            long sequenceNumber = sequenceNumber(partitionId);
+            
assertThat(entry.file().minSequenceNumber()).isEqualTo(sequenceNumber);
+            
assertThat(entry.file().maxSequenceNumber()).isEqualTo(sequenceNumber);
+        }
+
+        Collections.sort(
+                ranges,
+                (left, right) -> {
+                    int result = Long.compare(left.from, right.from);
+                    return result == 0 ? Long.compare(left.to, right.to) : 
result;
+                });
+        long previousEnd = Long.MIN_VALUE;
+        for (Range range : ranges) {
+            assertThat(range.from).isGreaterThan(previousEnd);
+            previousEnd = range.to;
+        }
+
+        for (int partitionId = 0; partitionId < partitionCount; partitionId++) 
{
+            assertThat(entriesByPartition[partitionId])
+                    
.isEqualTo(expectedAssignment.entriesByPartition[partitionId]);
+            if (entriesByPartition[partitionId] == 0) {
+                continue;
+            }
+
+            if (expectedAssignment.partitionsToReassign[partitionId]) {
+                assertThat(minRowIdByPartition[partitionId])
+                        .isGreaterThanOrEqualTo(firstAssignedRowId);
+                assertThat(maxRowIdByPartition[partitionId])
+                        .isEqualTo(
+                                minRowIdByPartition[partitionId]
+                                        + 
logicalRowCount(entriesByPartition[partitionId])
+                                        - 1);
+            } else {
+                assertThat(minRowIdByPartition[partitionId])
+                        
.isEqualTo(expectedAssignment.minRowIdByPartition[partitionId]);
+                assertThat(maxRowIdByPartition[partitionId])
+                        
.isEqualTo(expectedAssignment.maxRowIdByPartition[partitionId]);
+            }
+        }
+        
assertThat(reassignedEntryCount).isEqualTo(expectedAssignment.reassignedEntryCount);
+        
assertThat(reassignedRowCount).isEqualTo(expectedAssignment.reassignedRowCount);
+        assertThat(table.snapshotManager().latestSnapshot().nextRowId())
+                .isEqualTo(firstAssignedRowId + reassignedRowCount);
+    }
+
+    private long logicalRowCount(long entryCount) {
+        return entryCount * LARGE_ENTRY_ROW_COUNT;
+    }
+
+    private int largePartitionCount(int entryCount) {
+        return entryCount / 2;
+    }
+
+    private ExpectedLargeManifestAssignment expectedLargeManifestAssignment(
+            FileStoreTable table, Snapshot snapshot, int entryCount) {
+        int partitionCount = largePartitionCount(entryCount);
+        int[] entriesByPartition = new int[partitionCount];
+        long[] minRowIdByPartition = new long[partitionCount];
+        long[] maxRowIdByPartition = new long[partitionCount];
+        long logicalRowCount = logicalRowCount(entryCount);
+        assertThat(logicalRowCount).isLessThanOrEqualTo((long) 
Integer.MAX_VALUE);
+        boolean[] rowIdExists = new boolean[(int) logicalRowCount];
+        Arrays.fill(minRowIdByPartition, Long.MAX_VALUE);
+        Arrays.fill(maxRowIdByPartition, Long.MIN_VALUE);
+
+        ManifestFile manifestFile = 
table.store().manifestFileFactory().create();
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        List<ManifestFileMeta> manifestMetas = 
manifestList.readDataManifests(snapshot);
+        long existingRowCount = 0;
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            for (ManifestEntry entry :
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize())) {
+                if (entry.kind() != FileKind.ADD) {
+                    continue;
+                }
+                Range range = entry.file().nonNullRowIdRange();
+                assertThat(range.count()).isEqualTo(LARGE_ENTRY_ROW_COUNT);
+                assertThat(range.from).isBetween(0L, logicalRowCount - 1);
+                assertThat(range.to).isBetween(0L, logicalRowCount - 1);
+                for (long rowId = range.from; rowId <= range.to; rowId++) {
+                    assertThat(rowIdExists[(int) rowId]).isFalse();
+                    rowIdExists[(int) rowId] = true;
+                    existingRowCount++;
+                }
+                int partitionId = 
partitionIdFromLargeFileName(entry.file().fileName());
+                entriesByPartition[partitionId]++;
+                minRowIdByPartition[partitionId] =
+                        Math.min(minRowIdByPartition[partitionId], range.from);
+                maxRowIdByPartition[partitionId] =
+                        Math.max(maxRowIdByPartition[partitionId], range.to);
+            }
+        }
+        assertThat(existingRowCount).isEqualTo(logicalRowCount);
+
+        boolean[] partitionsToReassign = new boolean[partitionCount];
+        long reassignedEntryCount = 0;
+        for (int partitionId = 0; partitionId < partitionCount; partitionId++) 
{
+            int partitionEntryCount = entriesByPartition[partitionId];
+            if (partitionEntryCount == 0) {
+                continue;
+            }
+            if (maxRowIdByPartition[partitionId] - 
minRowIdByPartition[partitionId] + 1
+                    != logicalRowCount(partitionEntryCount)) {
+                partitionsToReassign[partitionId] = true;
+                reassignedEntryCount += partitionEntryCount;
+            }
+        }
+
+        Set<String> rewrittenManifestFiles = new HashSet<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            for (ManifestEntry entry :
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize())) {
+                if (entry.kind() != FileKind.ADD) {
+                    continue;
+                }
+                int partitionId = 
partitionIdFromLargeFileName(entry.file().fileName());
+                if (partitionsToReassign[partitionId]) {
+                    rewrittenManifestFiles.add(manifestMeta.fileName());
+                    break;
+                }
+            }
+        }
+        assertThat(reassignedEntryCount).isGreaterThan(0L);
+        return new ExpectedLargeManifestAssignment(
+                entriesByPartition,
+                minRowIdByPartition,
+                maxRowIdByPartition,
+                partitionsToReassign,
+                reassignedEntryCount,
+                logicalRowCount(reassignedEntryCount),
+                rewrittenManifestFiles);
+    }
+
+    private String largePartition(int partitionId) {
+        String partition = Integer.toString(partitionId);
+        StringBuilder builder = new StringBuilder("p");
+        for (int i = partition.length(); i < LARGE_PARTITION_ID_WIDTH; i++) {
+            builder.append('0');
+        }
+        return builder.append(partition).toString();
+    }
+
+    private String largeFileName(int partitionId, int entryOrdinal) {
+        return LARGE_FILE_PREFIX + partitionId + "-" + entryOrdinal + 
LARGE_FILE_SUFFIX;
+    }
+
+    private int partitionIdFromLargeFileName(String fileName) {
+        String withoutPrefixAndSuffix =
+                fileName.substring(
+                        LARGE_FILE_PREFIX.length(), fileName.length() - 
LARGE_FILE_SUFFIX.length());
+        return Integer.parseInt(
+                withoutPrefixAndSuffix.substring(0, 
withoutPrefixAndSuffix.indexOf('-')));
+    }
+
+    private long sequenceNumber(int partitionId) {
+        return 10_000_000L + partitionId;
+    }
+
+    private void writeOneRow(FileStoreTable table, String partition, int id) 
throws Exception {
+        writeRows(table, partition, id);
+    }
+
+    private void writeRows(FileStoreTable table, String partition, int... ids) 
throws Exception {
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = builder.newWrite();
+                BatchTableCommit commit = builder.newCommit()) {
+            for (int id : ids) {
+                write.write(
+                        GenericRow.of(
+                                BinaryString.fromString(partition),
+                                id,
+                                BinaryString.fromString("v" + id)));
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    private void writeUnpartitionedRows(FileStoreTable table, int... ids) 
throws Exception {
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = builder.newWrite();
+                BatchTableCommit commit = builder.newCommit()) {
+            for (int id : ids) {
+                write.write(GenericRow.of(id, BinaryString.fromString("v" + 
id)));
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    private PartitionPredicate partitionPredicate(FileStoreTable table, String 
partition) {
+        return PartitionPredicate.fromMaps(
+                table.schema().logicalPartitionType(),
+                Collections.singletonList(Collections.singletonMap("pt", 
partition)),
+                table.coreOptions().partitionDefaultName());
+    }
+
+    private Map<String, List<Long>> rowIdsByPartition(FileStoreTable table) {
+        List<ManifestEntry> entries =
+                table.store()
+                        .newScan()
+                        .withSnapshot(table.snapshotManager().latestSnapshot())
+                        .plan()
+                        .files();
+        Map<String, List<Long>> result = new LinkedHashMap<>();
+        for (ManifestEntry entry : entries) {
+            String partition = 
table.store().pathFactory().getPartitionString(entry.partition());
+            result.computeIfAbsent(partition, k -> new ArrayList<>())
+                    .add(entry.file().nonNullFirstRowId());
+        }
+        for (List<Long> rowIds : result.values()) {
+            Collections.sort(rowIds);
+        }
+        return result;
+    }
+
+    private Map<String, List<Long>> expandedRowIdsByPartition(FileStoreTable 
table) {
+        List<ManifestEntry> entries =
+                table.store()
+                        .newScan()
+                        .withSnapshot(table.snapshotManager().latestSnapshot())
+                        .plan()
+                        .files();
+        Map<String, List<Long>> result = new LinkedHashMap<>();
+        for (ManifestEntry entry : entries) {
+            String partition = 
table.store().pathFactory().getPartitionString(entry.partition());
+            List<Long> rowIds = result.computeIfAbsent(partition, k -> new 
ArrayList<>());
+            Range range = entry.file().nonNullRowIdRange();
+            for (long rowId = range.from; rowId <= range.to; rowId++) {
+                rowIds.add(rowId);
+            }
+        }
+        for (List<Long> rowIds : result.values()) {
+            Collections.sort(rowIds);
+        }
+        return result;
+    }
+
+    private Map<String, Set<String>> 
currentPartitionsByManifest(FileStoreTable table) {
+        Snapshot latest = table.snapshotManager().latestSnapshot();
+        ManifestFile manifestFile = 
table.store().manifestFileFactory().create();
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        List<ManifestFileMeta> manifestMetas = 
manifestList.readDataManifests(latest);
+
+        Set<FileEntry.Identifier> deletedIdentifiers = new HashSet<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            if (manifestMeta.numDeletedFiles() <= 0) {
+                continue;
+            }
+            for (ManifestEntry entry :
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize())) {
+                if (entry.kind() == FileKind.DELETE) {
+                    deletedIdentifiers.add(entry.identifier());
+                }
+            }
+        }
+
+        Map<String, Set<String>> result = new LinkedHashMap<>();
+        for (ManifestFileMeta manifestMeta : manifestMetas) {
+            if (manifestMeta.numAddedFiles() <= 0) {
+                continue;
+            }
+            for (ManifestEntry entry :
+                    manifestFile.read(manifestMeta.fileName(), 
manifestMeta.fileSize())) {
+                if (entry.kind() != FileKind.ADD
+                        || deletedIdentifiers.contains(entry.identifier())) {
+                    continue;
+                }
+                String partition =
+                        
table.store().pathFactory().getPartitionString(entry.partition());
+                result.computeIfAbsent(manifestMeta.fileName(), k -> new 
LinkedHashSet<>())
+                        .add(partition);
+            }
+        }
+        return result;
+    }
+
+    private List<String> dataManifestFileNames(FileStoreTable table) {
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        Snapshot latest = table.snapshotManager().latestSnapshot();
+        List<String> result = new ArrayList<>();
+        for (ManifestFileMeta manifestMeta : 
manifestList.readDataManifests(latest)) {
+            result.add(manifestMeta.fileName());
+        }
+        return result;
+    }
+
+    private void createBTreeIndex(FileStoreTable table) throws Exception {
+        BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("id");
+        List<DataSplit> dataSplits =
+                builder.scan()
+                        .map(Pair::getRight)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Expected scan result when 
building index."));
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (DataSplit dataSplit : 
BTreeGlobalIndexBuilder.splitByContiguousRowRange(dataSplits)) {
+            commitMessages.addAll(builder.build(dataSplit, ioManager));
+        }
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(commitMessages);
+        }
+    }
+
+    private List<Range> globalIndexRanges(FileStoreTable table) {
+        List<Range> ranges = new ArrayList<>();
+        List<IndexManifestEntry> entries =
+                table.store()
+                        .indexManifestFileFactory()
+                        .create()
+                        
.read(table.snapshotManager().latestSnapshot().indexManifest());
+        for (IndexManifestEntry entry : entries) {
+            GlobalIndexMeta globalIndex = entry.indexFile().globalIndexMeta();
+            if (globalIndex != null) {
+                ranges.add(globalIndex.rowRange());
+            }
+        }
+        Collections.sort(
+                ranges,
+                (left, right) -> {
+                    int result = Long.compare(left.from, right.from);
+                    return result == 0 ? Long.compare(left.to, right.to) : 
result;
+                });
+        return ranges;
+    }
+
+    private List<String> readPayloads(FileStoreTable table, Predicate 
predicate) throws Exception {
+        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+        List<String> result = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(readBuilder.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            InternalRow projected = row;
+                            result.add(projected.getString(2).toString());
+                        });
+        return result;
+    }
+
+    private static class ExpectedLargeManifestAssignment {
+
+        private final int[] entriesByPartition;
+        private final long[] minRowIdByPartition;
+        private final long[] maxRowIdByPartition;
+        private final boolean[] partitionsToReassign;
+        private final long reassignedEntryCount;
+        private final long reassignedRowCount;
+        private final Set<String> rewrittenManifestFiles;
+
+        private ExpectedLargeManifestAssignment(
+                int[] entriesByPartition,
+                long[] minRowIdByPartition,
+                long[] maxRowIdByPartition,
+                boolean[] partitionsToReassign,
+                long reassignedEntryCount,
+                long reassignedRowCount,
+                Set<String> rewrittenManifestFiles) {
+            this.entriesByPartition = entriesByPartition;
+            this.minRowIdByPartition = minRowIdByPartition;
+            this.maxRowIdByPartition = maxRowIdByPartition;
+            this.partitionsToReassign = partitionsToReassign;
+            this.reassignedEntryCount = reassignedEntryCount;
+            this.reassignedRowCount = reassignedRowCount;
+            this.rewrittenManifestFiles = rewrittenManifestFiles;
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
new file mode 100644
index 0000000000..7381e9078b
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link RowRangeMappingIndex}. */
+public class RowRangeMappingIndexTest {
+
+    @Test
+    public void testMapSingleRange() {
+        RowRangeMappingIndex index =
+                RowRangeMappingIndex.create(
+                        Arrays.asList(RowRangeMappingIndex.mapping(10, 19, 
100)));
+
+        assertThat(index.map(new Range(12, 15))).isEqualTo(new Range(102, 
105));
+    }
+
+    @Test
+    public void testMapAcrossContiguousRanges() {
+        RowRangeMappingIndex index =
+                RowRangeMappingIndex.create(
+                        Arrays.asList(
+                                RowRangeMappingIndex.mapping(10, 14, 100),
+                                RowRangeMappingIndex.mapping(15, 19, 105),
+                                RowRangeMappingIndex.mapping(20, 24, 110)));
+
+        assertThat(index.map(new Range(12, 22))).isEqualTo(new Range(102, 
112));
+    }
+
+    @Test
+    public void testMapFailsWhenOldRangeIsNotCovered() {
+        RowRangeMappingIndex index =
+                RowRangeMappingIndex.create(
+                        Arrays.asList(
+                                RowRangeMappingIndex.mapping(10, 14, 100),
+                                RowRangeMappingIndex.mapping(20, 24, 105)));
+
+        assertThatThrownBy(() -> index.map(new Range(12, 22)))
+                .hasMessageContaining("is not fully covered");
+    }
+
+    @Test
+    public void testMapFailsWhenNewRangeIsNotContiguous() {
+        RowRangeMappingIndex index =
+                RowRangeMappingIndex.create(
+                        Arrays.asList(
+                                RowRangeMappingIndex.mapping(10, 14, 100),
+                                RowRangeMappingIndex.mapping(15, 19, 200)));
+
+        assertThatThrownBy(() -> index.map(new Range(12, 17)))
+                .hasMessageContaining("maps to non-contiguous new row range");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdAction.java
new file mode 100644
index 0000000000..f094e22ddc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdAction.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionRowIdReassigner;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Action to reassign row IDs for data evolution tables. */
+public class ReassignRowIdAction extends ActionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReassignRowIdAction.class);
+
+    private final Identifier identifier;
+    private final Map<String, String> tableConf;
+    private final List<Map<String, String>> partitionSpecs;
+
+    public ReassignRowIdAction(
+            Map<String, String> catalogConfig,
+            String databaseName,
+            String tableName,
+            Map<String, String> tableConfig,
+            List<Map<String, String>> partitionSpecs) {
+        super(catalogConfig);
+        this.identifier = Identifier.create(databaseName, tableName);
+        this.tableConf = new HashMap<>(tableConfig);
+        this.partitionSpecs = new ArrayList<>(partitionSpecs);
+    }
+
+    @Override
+    public void build() throws Exception {
+        Table table = catalog.getTable(identifier).copy(tableConf);
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Action '%s' only supports file store table, but table '%s' is 
%s.",
+                ReassignRowIdActionFactory.IDENTIFIER,
+                identifier.getFullName(),
+                table.getClass().getName());
+
+        DataStreamSource<Void> source =
+                env.addSource(
+                        new ReassignRowIdSource(
+                                identifier.getFullName(), (FileStoreTable) 
table, partitionSpecs),
+                        "Reassign Row ID : " + identifier.getFullName(),
+                        BasicTypeInfo.VOID_TYPE_INFO);
+        ((LegacySourceTransformation<?>) source.getTransformation())
+                .setBoundedness(Boundedness.BOUNDED);
+        source.setParallelism(1).forward().sinkTo(new 
DiscardingSink<>()).setParallelism(1);
+    }
+
+    @Override
+    public void run() throws Exception {
+        build();
+        env.execute("Reassign Row ID : " + identifier.getFullName());
+    }
+
+    public static @Nullable PartitionPredicate toPartitionPredicate(
+            FileStoreTable table, List<Map<String, String>> partitionSpecs) {
+        if (partitionSpecs.isEmpty()) {
+            return null;
+        }
+
+        RowType partitionType = table.schema().logicalPartitionType();
+        List<String> partitionKeys = table.partitionKeys();
+        checkArgument(
+                partitionType.getFieldCount() > 0,
+                "Partition filter can only be used for partitioned table 
'%s'.",
+                table.name());
+
+        String defaultName = table.coreOptions().partitionDefaultName();
+        for (Map<String, String> partitionSpec : partitionSpecs) {
+            checkArgument(
+                    partitionSpec.keySet().containsAll(partitionKeys)
+                            && 
partitionKeys.containsAll(partitionSpec.keySet()),
+                    "Partition filter for table '%s' must match partition keys 
%s, but was %s.",
+                    table.name(),
+                    partitionKeys,
+                    partitionSpec);
+        }
+
+        return PartitionPredicate.fromMaps(partitionType, partitionSpecs, 
defaultName);
+    }
+
+    public static String formatResult(
+            String tableName, DataEvolutionRowIdReassigner.Result result) {
+        if (!result.reassigned) {
+            String reason =
+                    result.skipReason == null
+                            ? "row IDs are already partition-contiguous"
+                            : result.skipReason;
+            return String.format(
+                    "Skipped. Row IDs for table '%s' were not reassigned 
because %s:"
+                            + " snapshot %d unchanged, nextRowId=%d.",
+                    tableName, reason, result.previousSnapshotId, 
result.nextRowId);
+        }
+        return String.format(
+                "Success. Reassigned row IDs for table '%s': snapshot %d -> 
%d,"
+                        + " nextRowId %d -> %d, files=%d, rows=%d, 
indexFiles=%d.",
+                tableName,
+                result.previousSnapshotId,
+                result.newSnapshotId,
+                result.firstAssignedRowId,
+                result.nextRowId,
+                result.fileCount,
+                result.rowCount,
+                result.indexFileCount);
+    }
+
+    private static class ReassignRowIdSource implements SourceFunction<Void> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final String tableName;
+        private final FileStoreTable table;
+        private final List<Map<String, String>> partitionSpecs;
+
+        private ReassignRowIdSource(
+                String tableName, FileStoreTable table, List<Map<String, 
String>> partitionSpecs) {
+            this.tableName = tableName;
+            this.table = table;
+            this.partitionSpecs = partitionSpecs;
+        }
+
+        @Override
+        public void run(SourceContext<Void> sourceContext) {
+            DataEvolutionRowIdReassigner.Result result =
+                    new DataEvolutionRowIdReassigner(
+                                    table, toPartitionPredicate(table, 
partitionSpecs))
+                            .reassign();
+            String message = formatResult(tableName, result);
+            LOG.info(message);
+            System.out.println(message);
+        }
+
+        @Override
+        public void cancel() {}
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdActionFactory.java
new file mode 100644
index 0000000000..1f6967d14d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReassignRowIdActionFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.ReassignRowIdProcedure;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link ReassignRowIdAction}. */
+public class ReassignRowIdActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = ReassignRowIdProcedure.IDENTIFIER;
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        return Optional.of(
+                new ReassignRowIdAction(
+                        catalogConfigMap(params),
+                        params.getRequired(DATABASE),
+                        params.getRequired(TABLE),
+                        optionalConfigMap(params, TABLE_CONF),
+                        optionalPartitions(params)));
+    }
+
+    private List<Map<String, String>> 
optionalPartitions(MultipleParameterToolAdapter params) {
+        return params.has(PARTITION) ? getPartitions(params) : 
Collections.emptyList();
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \""
+                        + IDENTIFIER
+                        + "\" reassigns row IDs for a data evolution table 
when partition "
+                        + "row-id ranges overlap.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  "
+                        + IDENTIFIER
+                        + " --warehouse <warehouse_path> --database 
<database_name> "
+                        + "--table <table_name> [--table_conf <key>=<value>] "
+                        + "[--catalog_conf <key>=<value>] "
+                        + "[--partition <key>=<value>[,<key>=<value>] ...]");
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  "
+                        + IDENTIFIER
+                        + " --warehouse hdfs:///path/to/warehouse "
+                        + "--database test_db --table test_table "
+                        + "--partition dt=2026-05-19");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReassignRowIdProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReassignRowIdProcedure.java
new file mode 100644
index 0000000000..cf33aa0f81
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReassignRowIdProcedure.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionRowIdReassigner;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.ReassignRowIdAction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Procedure to reassign row IDs for data evolution tables. */
+public class ReassignRowIdProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "reassign_row_id";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
+    public String[] call(ProcedureContext procedureContext, String tableId)
+            throws Catalog.TableNotExistException {
+        return reassign(tableId, Collections.emptyList());
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "partitions", type = 
@DataTypeHint("STRING"))
+            })
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String partitions)
+            throws Catalog.TableNotExistException {
+        return reassign(tableId, 
ParameterUtils.getPartitions(partitions.split(";")));
+    }
+
+    private String[] reassign(String tableId, List<Map<String, String>> 
partitionSpecs)
+            throws Catalog.TableNotExistException {
+        Table table = table(tableId);
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Procedure '%s' only supports file store table, but table '%s' 
is %s.",
+                IDENTIFIER,
+                tableId,
+                table.getClass().getName());
+
+        DataEvolutionRowIdReassigner.Result result =
+                new DataEvolutionRowIdReassigner(
+                                (FileStoreTable) table,
+                                ReassignRowIdAction.toPartitionPredicate(
+                                        (FileStoreTable) table, 
partitionSpecs))
+                        .reassign();
+        return new String[] {ReassignRowIdAction.formatResult(tableId, 
result)};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index eb83dcdcb3..db2777a3a0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -50,6 +50,7 @@ org.apache.paimon.flink.action.ClearConsumerActionFactory
 org.apache.paimon.flink.action.RescaleActionFactory
 org.apache.paimon.flink.action.CloneActionFactory
 org.apache.paimon.flink.action.DataEvolutionMergeIntoActionFactory
+org.apache.paimon.flink.action.ReassignRowIdActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -102,6 +103,7 @@ 
org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
 org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
 org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
 org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
+org.apache.paimon.flink.procedure.ReassignRowIdProcedure
 org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
 org.apache.paimon.flink.procedure.VectorSearchProcedure
 org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionRowIdReassignerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionRowIdReassignerTest.java
new file mode 100644
index 0000000000..7bbe5eb497
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionRowIdReassignerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.ReassignRowIdAction;
+import org.apache.paimon.flink.action.ReassignRowIdActionFactory;
+import org.apache.paimon.flink.procedure.ReassignRowIdProcedure;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ReassignRowIdAction} and {@link ReassignRowIdProcedure}. 
*/
+public class DataEvolutionRowIdReassignerTest extends TableTestBase {
+
+    @Override
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("pt", DataTypes.STRING());
+        schemaBuilder.column("id", DataTypes.INT());
+        schemaBuilder.column("payload", DataTypes.STRING());
+        schemaBuilder.partitionKeys("pt");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.GLOBAL_INDEX_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.BUCKET.key(), "-1");
+        schemaBuilder.option(CoreOptions.SCAN_MANIFEST_PARALLELISM.key(), "1");
+        return schemaBuilder.build();
+    }
+
+    @Test
+    public void testActionReassignRowIdsByPartition() throws Exception {
+        FileStoreTable table = createTableWithInterleavedPartitions();
+
+        Action action =
+                ActionFactory.createAction(
+                                new String[] {
+                                    ReassignRowIdActionFactory.IDENTIFIER,
+                                    "--warehouse",
+                                    warehouse.toString(),
+                                    "--database",
+                                    database,
+                                    "--table",
+                                    DEFAULT_TABLE_NAME
+                                })
+                        .get();
+        assertThat(action).isInstanceOf(ReassignRowIdAction.class);
+
+        action.run();
+
+        Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
+        assertThat(rowIdsByPartition).hasSize(2);
+        assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L, 
6L, 7L));
+        assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L, 
9L));
+        
assertThat(table.snapshotManager().latestSnapshot().nextRowId()).isEqualTo(10L);
+    }
+
+    @Test
+    public void testActionBuildsBoundedSourceWithSink() throws Exception {
+        createTableDefault();
+        ReassignRowIdAction action =
+                (ReassignRowIdAction)
+                        ActionFactory.createAction(
+                                        new String[] {
+                                            
ReassignRowIdActionFactory.IDENTIFIER,
+                                            "--warehouse",
+                                            warehouse.toString(),
+                                            "--database",
+                                            database,
+                                            "--table",
+                                            DEFAULT_TABLE_NAME
+                                        })
+                                .get();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        action.withStreamExecutionEnvironment(env).build();
+
+        List<Transformation<?>> transformations = env.getTransformations();
+        assertThat(transformations).isNotEmpty();
+        String sourceName = "Reassign Row ID : " + database + "." + 
DEFAULT_TABLE_NAME;
+        Transformation<?> source =
+                transformations.stream()
+                        .map(transformation -> 
findTransformationByName(transformation, sourceName))
+                        .filter(Objects::nonNull)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(source).isNotNull();
+        assertThat(source).isInstanceOf(LegacySourceTransformation.class);
+        assertThat(((LegacySourceTransformation<?>) source).getBoundedness())
+                .isEqualTo(Boundedness.BOUNDED);
+        assertThat(source.getParallelism()).isEqualTo(1);
+        assertThat(source.getInputs()).isEmpty();
+
+        Transformation<?> sink =
+                transformations.stream()
+                        .filter(transformation -> transformation != source)
+                        .filter(
+                                transformation ->
+                                        
findTransformationByName(transformation, sourceName)
+                                                == source)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(sink).isNotNull();
+        assertThat(sink.getParallelism()).isEqualTo(1);
+        assertThat(findTransformationByName(sink, 
sourceName)).isSameAs(source);
+        
assertThat(env.getStreamGraph().getJobGraph().getVertices()).hasSize(2);
+    }
+
+    @Test
+    public void testProcedureReassignsSelectedPartition() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeOneRow(table, "a", 0);
+        writeOneRow(table, "b", 1);
+        writeOneRow(table, "c", 2);
+        writeOneRow(table, "c", 3);
+        writeOneRow(table, "d", 4);
+        writeOneRow(table, "d", 5);
+        writeOneRow(table, "b", 6);
+
+        ReassignRowIdProcedure procedure = new ReassignRowIdProcedure();
+        procedure.withCatalog(catalog);
+        String[] result = procedure.call(null, database + "." + 
DEFAULT_TABLE_NAME, "pt=b");
+
+        assertThat(result).hasSize(1);
+        assertThat(result[0]).contains("Success. Reassigned row IDs");
+        assertThat(rowIdsByPartition(table))
+                .containsEntry("pt=a/", Collections.singletonList(0L))
+                .containsEntry("pt=b/", Arrays.asList(7L, 8L))
+                .containsEntry("pt=c/", Arrays.asList(2L, 3L))
+                .containsEntry("pt=d/", Arrays.asList(4L, 5L));
+    }
+
+    private Transformation<?> findTransformationByName(
+            Transformation<?> transformation, String expectedName) {
+        if (expectedName.equals(transformation.getName())) {
+            return transformation;
+        }
+        for (Transformation<?> input : transformation.getInputs()) {
+            Transformation<?> found = findTransformationByName(input, 
expectedName);
+            if (found != null) {
+                return found;
+            }
+        }
+        return null;
+    }
+
+    private FileStoreTable createTableWithInterleavedPartitions() throws 
Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        writeOneRow(table, "a", 0);
+        writeOneRow(table, "b", 1);
+        writeOneRow(table, "a", 2);
+        writeOneRow(table, "b", 3);
+        writeOneRow(table, "a", 4);
+        return table;
+    }
+
+    private void writeOneRow(FileStoreTable table, String partition, int id) 
throws Exception {
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = builder.newWrite();
+                BatchTableCommit commit = builder.newCommit()) {
+            write.write(
+                    GenericRow.of(
+                            BinaryString.fromString(partition),
+                            id,
+                            BinaryString.fromString("v" + id)));
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    private Map<String, List<Long>> rowIdsByPartition(FileStoreTable table) {
+        List<ManifestEntry> entries =
+                table.store()
+                        .newScan()
+                        .withSnapshot(table.snapshotManager().latestSnapshot())
+                        .plan()
+                        .files();
+        Map<String, List<Long>> result = new LinkedHashMap<>();
+        for (ManifestEntry entry : entries) {
+            String partition = 
table.store().pathFactory().getPartitionString(entry.partition());
+            result.computeIfAbsent(partition, k -> new ArrayList<>())
+                    .add(entry.file().firstRowId());
+        }
+        for (List<Long> rowIds : result.values()) {
+            Collections.sort(rowIds);
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
new file mode 100644
index 0000000000..4be481369e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * The {@link 
org.apache.flink.streaming.api.functions.source.legacy.SourceFunction} migrated 
from
+ * Flink 1.x to resolve compatibility issues with Flink 2.x.
+ */
+public interface SourceFunction<T>
+        extends 
org.apache.flink.streaming.api.functions.source.legacy.SourceFunction<T> {}

Reply via email to