This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b309fe303f [core] Data-evolution mode support compact data-evolution 
table without manifest-meta:min/maxRowId (#7102)
b309fe303f is described below

commit b309fe303f1936d821b1a39f9e710ad19c5c7865
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jan 22 14:42:51 2026 +0800

    [core] Data-evolution mode support compact data-evolution table without 
manifest-meta:min/maxRowId (#7102)
---
 .../DataEvolutionCompactCoordinator.java           | 15 +++-
 .../DataEvolutionCompactCoordinatorTest.java       | 80 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 14bc83c547..f1fe3ae617 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -98,9 +98,18 @@ public class DataEvolutionCompactCoordinator {
 
             List<ManifestFileMeta> manifestFileMetas =
                     snapshotReader.manifestsReader().read(snapshot, 
ScanMode.ALL).filteredManifests;
-            RangeHelper<ManifestFileMeta> rangeHelper =
-                    new RangeHelper<>(ManifestFileMeta::minRowId, 
ManifestFileMeta::maxRowId);
-            this.metas = new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
+
+            boolean allManifestMetaContainsRowId =
+                    manifestFileMetas.stream()
+                            .allMatch(meta -> meta.minRowId() != null && 
meta.maxRowId() != null);
+            if (allManifestMetaContainsRowId) {
+                RangeHelper<ManifestFileMeta> rangeHelper =
+                        new RangeHelper<>(ManifestFileMeta::minRowId, 
ManifestFileMeta::maxRowId);
+                this.metas =
+                        new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
+            } else {
+                this.metas = new 
ArrayDeque<>(Collections.singletonList(manifestFileMetas));
+            }
         }
 
         List<ManifestEntry> scan() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index de4545c6df..c8c0178811 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -18,13 +18,23 @@
 
 package org.apache.paimon.append.dataevolution;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.stats.StatsTestUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Test;
 
@@ -35,6 +45,8 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /** Tests for {@link DataEvolutionCompactCoordinator.CompactPlanner}. */
 public class DataEvolutionCompactCoordinatorTest {
@@ -168,6 +180,74 @@ public class DataEvolutionCompactCoordinatorTest {
                         entries.get(5).file());
     }
 
+    @Test
+    public void testPlanWithNullManifestRowId() {
+        FileStoreTable table = mock(FileStoreTable.class);
+        SnapshotReader snapshotReader = mock(SnapshotReader.class);
+        SnapshotManager snapshotManager = mock(SnapshotManager.class);
+        Snapshot snapshot = mock(Snapshot.class);
+        ManifestsReader manifestsReader = mock(ManifestsReader.class);
+
+        Options options = new Options();
+        options.set("target-file-size", "1 kb");
+        options.set("source.split.open-file-cost", "1 b");
+        options.set("compaction.min.file-num", "2");
+        when(table.coreOptions()).thenReturn(new CoreOptions(options));
+        when(table.newSnapshotReader()).thenReturn(snapshotReader);
+        when(snapshotReader.withPartitionFilter((PartitionPredicate) null))
+                .thenReturn(snapshotReader);
+        when(snapshotReader.snapshotManager()).thenReturn(snapshotManager);
+        when(snapshotManager.latestSnapshot()).thenReturn(snapshot);
+        when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
+
+        ManifestFileMeta metaWithNullRowId =
+                new ManifestFileMeta(
+                        "manifest-1",
+                        1L,
+                        1L,
+                        0L,
+                        StatsTestUtils.newEmptySimpleStats(),
+                        0L,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+        ManifestFileMeta metaWithRowId =
+                new ManifestFileMeta(
+                        "manifest-2",
+                        1L,
+                        1L,
+                        0L,
+                        StatsTestUtils.newEmptySimpleStats(),
+                        0L,
+                        null,
+                        null,
+                        null,
+                        null,
+                        0L,
+                        199L);
+        List<ManifestFileMeta> metas = Arrays.asList(metaWithNullRowId, 
metaWithRowId);
+        when(manifestsReader.read(snapshot, ScanMode.ALL))
+                .thenReturn(new ManifestsReader.Result(snapshot, metas, 
metas));
+
+        ManifestEntry entry1 = makeEntry("file1.parquet", 0L, 100L, 600);
+        ManifestEntry entry2 = makeEntry("file2.parquet", 100L, 100L, 600);
+        when(snapshotReader.readManifest(metaWithNullRowId))
+                .thenReturn(Collections.singletonList(entry1));
+        when(snapshotReader.readManifest(metaWithRowId))
+                .thenReturn(Collections.singletonList(entry2));
+
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
+        assertThat(tasks).hasSize(1);
+        
assertThat(tasks.get(0).compactBefore().stream().map(DataFileMeta::fileName))
+                .containsExactly(entry1.file().fileName(), 
entry2.file().fileName());
+    }
+
     private ManifestEntry makeEntry(
             String fileName, long firstRowId, long rowCount, long fileSize) {
         return makeEntryWithSize(

Reply via email to