This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 554c85888 [core] Data files with delete records should not be upgraded 
directly to max level (#2962)
554c85888 is described below

commit 554c8588818a0e4e7e62b869870644e62885a46a
Author: tsreaper <[email protected]>
AuthorDate: Mon Mar 18 14:45:33 2024 +0800

    [core] Data files with delete records should not be upgraded directly to 
max level (#2962)
---
 .../java/org/apache/paimon/io/DataFileMeta.java    | 57 +++++++++++++++++----
 .../apache/paimon/io/DataFileMetaSerializer.java   |  6 ++-
 .../apache/paimon/io/KeyValueDataFileWriter.java   |  8 ++-
 .../java/org/apache/paimon/mergetree/Levels.java   | 10 ++--
 .../mergetree/compact/MergeTreeCompactManager.java |  1 +
 .../mergetree/compact/MergeTreeCompactTask.java    | 20 +++++++-
 .../AppendOnlyTableCompactionCoordinatorTest.java  |  3 +-
 .../paimon/crosspartition/IndexBootstrapTest.java  |  3 +-
 .../paimon/io/DataFileTestDataGenerator.java       |  3 +-
 .../org/apache/paimon/io/DataFileTestUtils.java    | 11 ++--
 .../ManifestCommittableSerializerTest.java         |  3 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  | 10 ++--
 .../org/apache/paimon/mergetree/LevelsTest.java    |  2 +-
 .../mergetree/compact/IntervalPartitionTest.java   |  3 +-
 .../mergetree/compact/UniversalCompactionTest.java |  2 +-
 .../operation/CleanedFileStoreExpireTest.java      |  3 +-
 .../apache/paimon/table/sink/TableWriteTest.java   | 58 ++++++++++++++++++++--
 .../paimon/table/source/SplitGeneratorTest.java    |  3 +-
 .../sink/CompactionTaskSimpleSerializerTest.java   |  3 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |  3 +-
 .../source/FileStoreSourceSplitSerializerTest.java |  3 +-
 21 files changed, 175 insertions(+), 40 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 0b1c5c50d..371241194 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -32,6 +32,8 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -58,6 +60,8 @@ public class DataFileMeta {
 
     private final String fileName;
     private final long fileSize;
+
+    // total number of rows (including add & delete) in this file
     private final long rowCount;
 
     private final BinaryRow minKey;
@@ -73,6 +77,12 @@ public class DataFileMeta {
     private final List<String> extraFiles;
     private final Timestamp creationTime;
 
+    // rowCount = addRowCount + deleteRowCount
+    // Why don't we keep addRowCount and deleteRowCount?
+    // Because in previous versions of DataFileMeta, we only keep rowCount.
+    // We have to keep the compatibility.
+    private final @Nullable Long deleteRowCount;
+
     public static DataFileMeta forAppend(
             String fileName,
             long fileSize,
@@ -92,7 +102,8 @@ public class DataFileMeta {
                 minSequenceNumber,
                 maxSequenceNumber,
                 schemaId,
-                DUMMY_LEVEL);
+                DUMMY_LEVEL,
+                0L);
     }
 
     public DataFileMeta(
@@ -106,7 +117,8 @@ public class DataFileMeta {
             long minSequenceNumber,
             long maxSequenceNumber,
             long schemaId,
-            int level) {
+            int level,
+            @Nullable Long deleteRowCount) {
         this(
                 fileName,
                 fileSize,
@@ -120,7 +132,8 @@ public class DataFileMeta {
                 schemaId,
                 level,
                 Collections.emptyList(),
-                
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp());
+                
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+                deleteRowCount);
     }
 
     public DataFileMeta(
@@ -136,9 +149,11 @@ public class DataFileMeta {
             long schemaId,
             int level,
             List<String> extraFiles,
-            Timestamp creationTime) {
+            Timestamp creationTime,
+            @Nullable Long deleteRowCount) {
         this.fileName = fileName;
         this.fileSize = fileSize;
+
         this.rowCount = rowCount;
 
         this.minKey = minKey;
@@ -152,6 +167,8 @@ public class DataFileMeta {
         this.schemaId = schemaId;
         this.extraFiles = Collections.unmodifiableList(extraFiles);
         this.creationTime = creationTime;
+
+        this.deleteRowCount = deleteRowCount;
     }
 
     public String fileName() {
@@ -166,6 +183,14 @@ public class DataFileMeta {
         return rowCount;
     }
 
+    public Optional<Long> addRowCount() {
+        return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c);
+    }
+
+    public Optional<Long> deleteRowCount() {
+        return Optional.ofNullable(deleteRowCount);
+    }
+
     public BinaryRow minKey() {
         return minKey;
     }
@@ -250,7 +275,8 @@ public class DataFileMeta {
                 schemaId,
                 newLevel,
                 extraFiles,
-                creationTime);
+                creationTime,
+                deleteRowCount);
     }
 
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -274,11 +300,15 @@ public class DataFileMeta {
                 schemaId,
                 level,
                 newExtraFiles,
-                creationTime);
+                creationTime,
+                deleteRowCount);
     }
 
     @Override
     public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
         if (!(o instanceof DataFileMeta)) {
             return false;
         }
@@ -295,7 +325,8 @@ public class DataFileMeta {
                 && schemaId == that.schemaId
                 && level == that.level
                 && Objects.equals(extraFiles, that.extraFiles)
-                && Objects.equals(creationTime, that.creationTime);
+                && Objects.equals(creationTime, that.creationTime)
+                && Objects.equals(deleteRowCount, that.deleteRowCount);
     }
 
     @Override
@@ -313,13 +344,17 @@ public class DataFileMeta {
                 schemaId,
                 level,
                 extraFiles,
-                creationTime);
+                creationTime,
+                deleteRowCount);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s, %s}",
+                "{fileName: %s, fileSize: %d, rowCount: %d, "
+                        + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: 
%s, "
+                        + "minSequenceNumber: %d, maxSequenceNumber: %d, "
+                        + "schemaId: %d, level: %d, extraFiles: %s, 
creationTime: %s, deleteRowCount: %d}",
                 fileName,
                 fileSize,
                 rowCount,
@@ -332,7 +367,8 @@ public class DataFileMeta {
                 schemaId,
                 level,
                 extraFiles,
-                creationTime);
+                creationTime,
+                deleteRowCount);
     }
 
     public static RowType schema() {
@@ -350,6 +386,7 @@ public class DataFileMeta {
         fields.add(new DataField(10, "_LEVEL", new IntType(false)));
         fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, 
newStringType(false))));
         fields.add(new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()));
+        fields.add(new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)));
         return new RowType(fields);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index a823b5791..f91e3d293 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -53,7 +53,8 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 meta.schemaId(),
                 meta.level(),
                 toStringArrayData(meta.extraFiles()),
-                meta.creationTime());
+                meta.creationTime(),
+                meta.deleteRowCount().orElse(null));
     }
 
     @Override
@@ -71,6 +72,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 row.getLong(9),
                 row.getInt(10),
                 fromStringArrayData(row.getArray(11)),
-                row.getTimestamp(12, 3));
+                row.getTimestamp(12, 3),
+                row.isNullAt(13) ? null : row.getLong(13));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index c5b2de741..e2e5441f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -67,6 +67,7 @@ public class KeyValueDataFileWriter
     private InternalRow maxKey = null;
     private long minSeqNumber = Long.MAX_VALUE;
     private long maxSeqNumber = Long.MIN_VALUE;
+    private long deleteRecordCount = 0;
 
     public KeyValueDataFileWriter(
             FileIO fileIO,
@@ -111,6 +112,10 @@ public class KeyValueDataFileWriter
         updateMinSeqNumber(kv);
         updateMaxSeqNumber(kv);
 
+        if (kv.valueKind().isRetract()) {
+            deleteRecordCount++;
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Write to Path " + path + " key value " + 
kv.toString(keyType, valueType));
         }
@@ -162,6 +167,7 @@ public class KeyValueDataFileWriter
                 minSeqNumber,
                 maxSeqNumber,
                 schemaId,
-                level);
+                level,
+                deleteRecordCount);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
index 94800e8eb..350b693db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
@@ -50,11 +50,11 @@ public class Levels {
         this.keyComparator = keyComparator;
 
         // in case the num of levels is not specified explicitly
-        int restoredMaxLevel =
+        int restoredNumLevels =
                 Math.max(
                         numLevels,
                         
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
-        checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
+        checkArgument(restoredNumLevels > 1, "Number of levels must be at 
least 2.");
         this.level0 =
                 new TreeSet<>(
                         (a, b) -> {
@@ -70,7 +70,7 @@ public class Levels {
                             }
                         });
         this.levels = new ArrayList<>();
-        for (int i = 1; i < restoredMaxLevel; i++) {
+        for (int i = 1; i < restoredNumLevels; i++) {
             levels.add(SortedRun.empty());
         }
 
@@ -108,6 +108,10 @@ public class Levels {
         return levels.size() + 1;
     }
 
+    public int maxLevel() {
+        return levels.size();
+    }
+
     public int numberOfSortedRuns() {
         int numberOfSortedRuns = level0.size();
         for (SortedRun run : levels) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 7bdb44118..80316eacb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -182,6 +182,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                         rewriter,
                         unit,
                         dropDelete,
+                        levels.maxLevel(),
                         metricsReporter);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 7299fbb5c..5f96743fe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -44,6 +44,7 @@ public class MergeTreeCompactTask extends CompactTask {
     private final List<List<SortedRun>> partitioned;
 
     private final boolean dropDelete;
+    private final int maxLevel;
 
     // metric
     private int upgradeFilesNum;
@@ -54,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask {
             CompactRewriter rewriter,
             CompactUnit unit,
             boolean dropDelete,
+            int maxLevel,
             @Nullable CompactionMetrics.Reporter metricsReporter) {
         super(metricsReporter);
         this.minFileSize = minFileSize;
@@ -61,6 +63,7 @@ public class MergeTreeCompactTask extends CompactTask {
         this.outputLevel = unit.outputLevel();
         this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
         this.dropDelete = dropDelete;
+        this.maxLevel = maxLevel;
 
         this.upgradeFilesNum = 0;
     }
@@ -107,10 +110,20 @@ public class MergeTreeCompactTask extends CompactTask {
     }
 
     private void upgrade(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
-        if (file.level() != outputLevel) {
+        if (file.level() == outputLevel) {
+            return;
+        }
+
+        if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 
0).orElse(false)) {
             CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
             toUpdate.merge(upgradeResult);
             upgradeFilesNum++;
+        } else {
+            // files with delete records should not be upgraded directly to 
max level
+            List<List<SortedRun>> candidate = new ArrayList<>();
+            candidate.add(new ArrayList<>());
+            candidate.get(0).add(SortedRun.fromSingle(file));
+            rewriteImpl(candidate, toUpdate);
         }
     }
 
@@ -130,6 +143,11 @@ public class MergeTreeCompactTask extends CompactTask {
                 return;
             }
         }
+        rewriteImpl(candidate, toUpdate);
+    }
+
+    private void rewriteImpl(List<List<SortedRun>> candidate, CompactResult 
toUpdate)
+            throws Exception {
         CompactResult rewriteResult = rewriter.rewrite(outputLevel, 
dropDelete, candidate);
         toUpdate.merge(rewriteResult);
         candidate.clear();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index 0b00cfefa..e6209e223 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -185,6 +185,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
                 0,
                 0,
                 0,
-                0);
+                0,
+                0L);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index cd9568d46..50df6e497 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -132,7 +132,8 @@ public class IndexBootstrapTest extends TableTestBase {
                 Timestamp.fromLocalDateTime(
                         Instant.ofEpochMilli(timeMillis)
                                 .atZone(ZoneId.systemDefault())
-                                .toLocalDateTime()));
+                                .toLocalDateTime()),
+                0L);
     }
 
     private Pair<InternalRow, Integer> row(int pt, int col, int pk, int 
bucket) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index 6301fbc3d..aed01ca21 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -160,7 +160,8 @@ public class DataFileTestDataGenerator {
                         minSequenceNumber,
                         maxSequenceNumber,
                         0,
-                        level),
+                        level,
+                        kvs.stream().filter(kv -> 
kv.valueKind().isRetract()).count()),
                 kvs);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index 062c3384a..624237b38 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -40,7 +40,7 @@ public class DataFileTestUtils {
         return new DataFileMeta(
                 "",
                 maxSeq - minSeq + 1,
-                maxSeq - minSeq + 1,
+                0L,
                 DataFileMeta.EMPTY_MIN_KEY,
                 DataFileMeta.EMPTY_MAX_KEY,
                 DataFileMeta.EMPTY_KEY_STATS,
@@ -50,7 +50,8 @@ public class DataFileTestUtils {
                 0L,
                 DataFileMeta.DUMMY_LEVEL,
                 Collections.emptyList(),
-                Timestamp.fromEpochMillis(100));
+                Timestamp.fromEpochMillis(100),
+                maxSeq - minSeq + 1);
     }
 
     public static DataFileMeta newFile() {
@@ -65,7 +66,8 @@ public class DataFileTestUtils {
                 0,
                 0,
                 0,
-                0);
+                0,
+                0L);
     }
 
     public static DataFileMeta newFile(
@@ -81,7 +83,8 @@ public class DataFileTestUtils {
                 0,
                 maxSequence,
                 0,
-                level);
+                level,
+                0L);
     }
 
     public static BinaryRow row(int i) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index 88b309bfd..ee279c097 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -113,6 +113,7 @@ public class ManifestCommittableSerializerTest {
                 0,
                 1,
                 0,
-                level);
+                level,
+                0L);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 3a9754dec..1a36346c1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -72,7 +72,7 @@ public abstract class ManifestFileMetaTestBase {
                         fileName,
                         0, // not used
                         0, // not used
-                        binaryRow, // not useds
+                        binaryRow, // not used
                         binaryRow, // not used
                         StatsTestUtils.newEmptyTableStats(), // not used
                         StatsTestUtils.newEmptyTableStats(), // not used
@@ -81,7 +81,9 @@ public abstract class ManifestFileMetaTestBase {
                         0, // not used
                         0, // not used
                         Collections.emptyList(),
-                        Timestamp.fromEpochMillis(200000)));
+                        Timestamp.fromEpochMillis(200000),
+                        0L // not used
+                        ));
     }
 
     protected ManifestFileMeta makeManifest(ManifestEntry... entries) {
@@ -242,7 +244,7 @@ public abstract class ManifestFileMetaTestBase {
                         0, // not used
                         0, // not used
                         0, // not used
-                        0 // not used
-                        ));
+                        0, // not used
+                        0L));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index 2630be65c..c424b6094 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -69,6 +69,6 @@ public class LevelsTest {
 
     public static DataFileMeta newFile(int level) {
         return new DataFileMeta(
-                UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, 
null, 0, 1, 0, level);
+                UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, 
null, 0, 1, 0, level, 0L);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 02d7967a1..c4edd76e2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -180,7 +180,8 @@ public class IntervalPartitionTest {
                 0,
                 0,
                 Collections.emptyList(),
-                Timestamp.fromEpochMillis(100000));
+                Timestamp.fromEpochMillis(100000),
+                0L);
     }
 
     private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>> 
sections) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 747023b2e..313f9799a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -357,6 +357,6 @@ public class UniversalCompactionTest {
     }
 
     static DataFileMeta file(long size) {
-        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0);
+        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0, 0L);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 463e8ba08..02e3ab9e3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -85,7 +85,8 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
                         0,
                         0,
                         extraFiles,
-                        Timestamp.now());
+                        Timestamp.now(),
+                        0L);
         ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, 
dataFile);
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index ccbade9ad..a97ed9c54 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
@@ -40,6 +42,7 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -220,14 +223,63 @@ public class TableWriteTest {
 
         StreamTableScan scan = table.newStreamScan();
         TableRead read = table.newRead();
+        assertThat(streamingRead(scan, read, latestSnapshotId))
+                .hasSize(numPartitions * numRecordsPerPartition);
+    }
+
+    @Test
+    public void testUpgradeToMaxLevel() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 1);
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.FULL_COMPACTION);
+
+        FileStoreTable table = createFileStoreTable(conf);
+        TableWriteImpl<?> write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 1, 10L));
+        write.write(GenericRow.of(1, 2, 20L));
+        write.write(GenericRow.ofKind(RowKind.DELETE, 1, 2, 20L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.compact(partition(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(GenericRow.of(1, 2, 21L));
+        write.compact(partition(1), 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.close();
+        commit.close();
+
+        Map<String, String> readOptions = new HashMap<>();
+        readOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
+        table = table.copy(readOptions);
+        long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+
+        StreamTableScan scan = table.newStreamScan();
+        TableRead read = table.newRead();
+        assertThat(streamingRead(scan, read, latestSnapshotId)).hasSize(2);
+    }
+
+    private BinaryRow partition(int x) {
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(partition);
+        writer.writeInt(0, x);
+        writer.complete();
+        return partition;
+    }
+
+    List<InternalRow> streamingRead(
+            StreamTableScan scan, TableRead read, long numStreamingSnapshots) 
throws Exception {
         List<InternalRow> actual = new ArrayList<>();
-        for (long i = 0; i <= latestSnapshotId; i++) {
+        for (long i = 0; i <= numStreamingSnapshots; i++) {
             RecordReader<InternalRow> reader = 
read.createReader(scan.plan().splits());
             reader.forEachRemaining(actual::add);
             reader.close();
         }
-
-        assertThat(actual).hasSize(numPartitions * numRecordsPerPartition);
+        return actual;
     }
 
     private FileStoreTable createFileStoreTable(Options conf) throws Exception 
{
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index 663d0dad8..6d97eda5f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -50,7 +50,8 @@ public class SplitGeneratorTest {
                 minSequence,
                 maxSequence,
                 0,
-                0);
+                0,
+                0L);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index 70f127291..c607e7a8f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -74,6 +74,7 @@ public class CompactionTaskSimpleSerializerTest {
                 0,
                 1,
                 0,
-                0);
+                0,
+                0L);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 5015e9219..4f53932a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -111,7 +111,8 @@ public class FileStoreSourceSplitGeneratorTest {
                             0, // not used
                             0, // not used
                             0, // not used
-                            0 // not used
+                            0, // not used
+                            0L // not used
                             ));
         }
         return DataSplit.builder()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index d0cafbb95..cbe6cb86d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -84,7 +84,8 @@ public class FileStoreSourceSplitSerializerTest {
                 0,
                 1,
                 0,
-                level);
+                level,
+                0L);
     }
 
     public static FileStoreSourceSplit newSourceSplit(

Reply via email to