This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 554c85888 [core] Data files with delete records should not be upgraded
directly to max level (#2962)
554c85888 is described below
commit 554c8588818a0e4e7e62b869870644e62885a46a
Author: tsreaper <[email protected]>
AuthorDate: Mon Mar 18 14:45:33 2024 +0800
[core] Data files with delete records should not be upgraded directly to
max level (#2962)
---
.../java/org/apache/paimon/io/DataFileMeta.java | 57 +++++++++++++++++----
.../apache/paimon/io/DataFileMetaSerializer.java | 6 ++-
.../apache/paimon/io/KeyValueDataFileWriter.java | 8 ++-
.../java/org/apache/paimon/mergetree/Levels.java | 10 ++--
.../mergetree/compact/MergeTreeCompactManager.java | 1 +
.../mergetree/compact/MergeTreeCompactTask.java | 20 +++++++-
.../AppendOnlyTableCompactionCoordinatorTest.java | 3 +-
.../paimon/crosspartition/IndexBootstrapTest.java | 3 +-
.../paimon/io/DataFileTestDataGenerator.java | 3 +-
.../org/apache/paimon/io/DataFileTestUtils.java | 11 ++--
.../ManifestCommittableSerializerTest.java | 3 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 10 ++--
.../org/apache/paimon/mergetree/LevelsTest.java | 2 +-
.../mergetree/compact/IntervalPartitionTest.java | 3 +-
.../mergetree/compact/UniversalCompactionTest.java | 2 +-
.../operation/CleanedFileStoreExpireTest.java | 3 +-
.../apache/paimon/table/sink/TableWriteTest.java | 58 ++++++++++++++++++++--
.../paimon/table/source/SplitGeneratorTest.java | 3 +-
.../sink/CompactionTaskSimpleSerializerTest.java | 3 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 3 +-
.../source/FileStoreSourceSplitSerializerTest.java | 3 +-
21 files changed, 175 insertions(+), 40 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 0b1c5c50d..371241194 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -32,6 +32,8 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import javax.annotation.Nullable;
+
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -58,6 +60,8 @@ public class DataFileMeta {
private final String fileName;
private final long fileSize;
+
+ // total number of rows (including add & delete) in this file
private final long rowCount;
private final BinaryRow minKey;
@@ -73,6 +77,12 @@ public class DataFileMeta {
private final List<String> extraFiles;
private final Timestamp creationTime;
+ // rowCount = addRowCount + deleteRowCount
+ // Why don't we keep addRowCount and deleteRowCount?
+ // Because in previous versions of DataFileMeta, we only keep rowCount.
+ // We have to keep the compatibility.
+ private final @Nullable Long deleteRowCount;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -92,7 +102,8 @@ public class DataFileMeta {
minSequenceNumber,
maxSequenceNumber,
schemaId,
- DUMMY_LEVEL);
+ DUMMY_LEVEL,
+ 0L);
}
public DataFileMeta(
@@ -106,7 +117,8 @@ public class DataFileMeta {
long minSequenceNumber,
long maxSequenceNumber,
long schemaId,
- int level) {
+ int level,
+ @Nullable Long deleteRowCount) {
this(
fileName,
fileSize,
@@ -120,7 +132,8 @@ public class DataFileMeta {
schemaId,
level,
Collections.emptyList(),
-
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp());
+
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+ deleteRowCount);
}
public DataFileMeta(
@@ -136,9 +149,11 @@ public class DataFileMeta {
long schemaId,
int level,
List<String> extraFiles,
- Timestamp creationTime) {
+ Timestamp creationTime,
+ @Nullable Long deleteRowCount) {
this.fileName = fileName;
this.fileSize = fileSize;
+
this.rowCount = rowCount;
this.minKey = minKey;
@@ -152,6 +167,8 @@ public class DataFileMeta {
this.schemaId = schemaId;
this.extraFiles = Collections.unmodifiableList(extraFiles);
this.creationTime = creationTime;
+
+ this.deleteRowCount = deleteRowCount;
}
public String fileName() {
@@ -166,6 +183,14 @@ public class DataFileMeta {
return rowCount;
}
+ public Optional<Long> addRowCount() {
+ return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c);
+ }
+
+ public Optional<Long> deleteRowCount() {
+ return Optional.ofNullable(deleteRowCount);
+ }
+
public BinaryRow minKey() {
return minKey;
}
@@ -250,7 +275,8 @@ public class DataFileMeta {
schemaId,
newLevel,
extraFiles,
- creationTime);
+ creationTime,
+ deleteRowCount);
}
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -274,11 +300,15 @@ public class DataFileMeta {
schemaId,
level,
newExtraFiles,
- creationTime);
+ creationTime,
+ deleteRowCount);
}
@Override
public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
if (!(o instanceof DataFileMeta)) {
return false;
}
@@ -295,7 +325,8 @@ public class DataFileMeta {
&& schemaId == that.schemaId
&& level == that.level
&& Objects.equals(extraFiles, that.extraFiles)
- && Objects.equals(creationTime, that.creationTime);
+ && Objects.equals(creationTime, that.creationTime)
+ && Objects.equals(deleteRowCount, that.deleteRowCount);
}
@Override
@@ -313,13 +344,17 @@ public class DataFileMeta {
schemaId,
level,
extraFiles,
- creationTime);
+ creationTime,
+ deleteRowCount);
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s, %s}",
+ "{fileName: %s, fileSize: %d, rowCount: %d, "
+ + "minKey: %s, maxKey: %s, keyStats: %s, valueStats:
%s, "
+ + "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ + "schemaId: %d, level: %d, extraFiles: %s,
creationTime: %s, deleteRowCount: %d}",
fileName,
fileSize,
rowCount,
@@ -332,7 +367,8 @@ public class DataFileMeta {
schemaId,
level,
extraFiles,
- creationTime);
+ creationTime,
+ deleteRowCount);
}
public static RowType schema() {
@@ -350,6 +386,7 @@ public class DataFileMeta {
fields.add(new DataField(10, "_LEVEL", new IntType(false)));
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
+ fields.add(new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)));
return new RowType(fields);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index a823b5791..f91e3d293 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -53,7 +53,8 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
meta.schemaId(),
meta.level(),
toStringArrayData(meta.extraFiles()),
- meta.creationTime());
+ meta.creationTime(),
+ meta.deleteRowCount().orElse(null));
}
@Override
@@ -71,6 +72,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
row.getLong(9),
row.getInt(10),
fromStringArrayData(row.getArray(11)),
- row.getTimestamp(12, 3));
+ row.getTimestamp(12, 3),
+ row.isNullAt(13) ? null : row.getLong(13));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index c5b2de741..e2e5441f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -67,6 +67,7 @@ public class KeyValueDataFileWriter
private InternalRow maxKey = null;
private long minSeqNumber = Long.MAX_VALUE;
private long maxSeqNumber = Long.MIN_VALUE;
+ private long deleteRecordCount = 0;
public KeyValueDataFileWriter(
FileIO fileIO,
@@ -111,6 +112,10 @@ public class KeyValueDataFileWriter
updateMinSeqNumber(kv);
updateMaxSeqNumber(kv);
+ if (kv.valueKind().isRetract()) {
+ deleteRecordCount++;
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Write to Path " + path + " key value " +
kv.toString(keyType, valueType));
}
@@ -162,6 +167,7 @@ public class KeyValueDataFileWriter
minSeqNumber,
maxSeqNumber,
schemaId,
- level);
+ level,
+ deleteRecordCount);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
index 94800e8eb..350b693db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
@@ -50,11 +50,11 @@ public class Levels {
this.keyComparator = keyComparator;
// in case the num of levels is not specified explicitly
- int restoredMaxLevel =
+ int restoredNumLevels =
Math.max(
numLevels,
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
- checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
+ checkArgument(restoredNumLevels > 1, "Number of levels must be at
least 2.");
this.level0 =
new TreeSet<>(
(a, b) -> {
@@ -70,7 +70,7 @@ public class Levels {
}
});
this.levels = new ArrayList<>();
- for (int i = 1; i < restoredMaxLevel; i++) {
+ for (int i = 1; i < restoredNumLevels; i++) {
levels.add(SortedRun.empty());
}
@@ -108,6 +108,10 @@ public class Levels {
return levels.size() + 1;
}
+ public int maxLevel() {
+ return levels.size();
+ }
+
public int numberOfSortedRuns() {
int numberOfSortedRuns = level0.size();
for (SortedRun run : levels) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 7bdb44118..80316eacb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -182,6 +182,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
rewriter,
unit,
dropDelete,
+ levels.maxLevel(),
metricsReporter);
if (LOG.isDebugEnabled()) {
LOG.debug(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 7299fbb5c..5f96743fe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -44,6 +44,7 @@ public class MergeTreeCompactTask extends CompactTask {
private final List<List<SortedRun>> partitioned;
private final boolean dropDelete;
+ private final int maxLevel;
// metric
private int upgradeFilesNum;
@@ -54,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask {
CompactRewriter rewriter,
CompactUnit unit,
boolean dropDelete,
+ int maxLevel,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.minFileSize = minFileSize;
@@ -61,6 +63,7 @@ public class MergeTreeCompactTask extends CompactTask {
this.outputLevel = unit.outputLevel();
this.partitioned = new IntervalPartition(unit.files(),
keyComparator).partition();
this.dropDelete = dropDelete;
+ this.maxLevel = maxLevel;
this.upgradeFilesNum = 0;
}
@@ -107,10 +110,20 @@ public class MergeTreeCompactTask extends CompactTask {
}
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws
Exception {
- if (file.level() != outputLevel) {
+ if (file.level() == outputLevel) {
+ return;
+ }
+
+ if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d ==
0).orElse(false)) {
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
toUpdate.merge(upgradeResult);
upgradeFilesNum++;
+ } else {
+ // files with delete records should not be upgraded directly to
max level
+ List<List<SortedRun>> candidate = new ArrayList<>();
+ candidate.add(new ArrayList<>());
+ candidate.get(0).add(SortedRun.fromSingle(file));
+ rewriteImpl(candidate, toUpdate);
}
}
@@ -130,6 +143,11 @@ public class MergeTreeCompactTask extends CompactTask {
return;
}
}
+ rewriteImpl(candidate, toUpdate);
+ }
+
+ private void rewriteImpl(List<List<SortedRun>> candidate, CompactResult
toUpdate)
+ throws Exception {
CompactResult rewriteResult = rewriter.rewrite(outputLevel,
dropDelete, candidate);
toUpdate.merge(rewriteResult);
candidate.clear();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index 0b00cfefa..e6209e223 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -185,6 +185,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
0,
0,
0,
- 0);
+ 0,
+ 0L);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index cd9568d46..50df6e497 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -132,7 +132,8 @@ public class IndexBootstrapTest extends TableTestBase {
Timestamp.fromLocalDateTime(
Instant.ofEpochMilli(timeMillis)
.atZone(ZoneId.systemDefault())
- .toLocalDateTime()));
+ .toLocalDateTime()),
+ 0L);
}
private Pair<InternalRow, Integer> row(int pt, int col, int pk, int
bucket) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index 6301fbc3d..aed01ca21 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -160,7 +160,8 @@ public class DataFileTestDataGenerator {
minSequenceNumber,
maxSequenceNumber,
0,
- level),
+ level,
+ kvs.stream().filter(kv ->
kv.valueKind().isRetract()).count()),
kvs);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index 062c3384a..624237b38 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -40,7 +40,7 @@ public class DataFileTestUtils {
return new DataFileMeta(
"",
maxSeq - minSeq + 1,
- maxSeq - minSeq + 1,
+ 0L,
DataFileMeta.EMPTY_MIN_KEY,
DataFileMeta.EMPTY_MAX_KEY,
DataFileMeta.EMPTY_KEY_STATS,
@@ -50,7 +50,8 @@ public class DataFileTestUtils {
0L,
DataFileMeta.DUMMY_LEVEL,
Collections.emptyList(),
- Timestamp.fromEpochMillis(100));
+ Timestamp.fromEpochMillis(100),
+ maxSeq - minSeq + 1);
}
public static DataFileMeta newFile() {
@@ -65,7 +66,8 @@ public class DataFileTestUtils {
0,
0,
0,
- 0);
+ 0,
+ 0L);
}
public static DataFileMeta newFile(
@@ -81,7 +83,8 @@ public class DataFileTestUtils {
0,
maxSequence,
0,
- level);
+ level,
+ 0L);
}
public static BinaryRow row(int i) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index 88b309bfd..ee279c097 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -113,6 +113,7 @@ public class ManifestCommittableSerializerTest {
0,
1,
0,
- level);
+ level,
+ 0L);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 3a9754dec..1a36346c1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -72,7 +72,7 @@ public abstract class ManifestFileMetaTestBase {
fileName,
0, // not used
0, // not used
- binaryRow, // not useds
+ binaryRow, // not used
binaryRow, // not used
StatsTestUtils.newEmptyTableStats(), // not used
StatsTestUtils.newEmptyTableStats(), // not used
@@ -81,7 +81,9 @@ public abstract class ManifestFileMetaTestBase {
0, // not used
0, // not used
Collections.emptyList(),
- Timestamp.fromEpochMillis(200000)));
+ Timestamp.fromEpochMillis(200000),
+ 0L // not used
+ ));
}
protected ManifestFileMeta makeManifest(ManifestEntry... entries) {
@@ -242,7 +244,7 @@ public abstract class ManifestFileMetaTestBase {
0, // not used
0, // not used
0, // not used
- 0 // not used
- ));
+ 0, // not used
+ 0L));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index 2630be65c..c424b6094 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -69,6 +69,6 @@ public class LevelsTest {
public static DataFileMeta newFile(int level) {
return new DataFileMeta(
- UUID.randomUUID().toString(), 0, 1, row(0), row(0), null,
null, 0, 1, 0, level);
+ UUID.randomUUID().toString(), 0, 1, row(0), row(0), null,
null, 0, 1, 0, level, 0L);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 02d7967a1..c4edd76e2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -180,7 +180,8 @@ public class IntervalPartitionTest {
0,
0,
Collections.emptyList(),
- Timestamp.fromEpochMillis(100000));
+ Timestamp.fromEpochMillis(100000),
+ 0L);
}
private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>>
sections) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 747023b2e..313f9799a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -357,6 +357,6 @@ public class UniversalCompactionTest {
}
static DataFileMeta file(long size) {
- return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0);
+ return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0, 0L);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 463e8ba08..02e3ab9e3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -85,7 +85,8 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
0,
0,
extraFiles,
- Timestamp.now());
+ Timestamp.now(),
+ 0L);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index ccbade9ad..a97ed9c54 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -19,6 +19,8 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
@@ -40,6 +42,7 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
@@ -220,14 +223,63 @@ public class TableWriteTest {
StreamTableScan scan = table.newStreamScan();
TableRead read = table.newRead();
+ assertThat(streamingRead(scan, read, latestSnapshotId))
+ .hasSize(numPartitions * numRecordsPerPartition);
+ }
+
+ @Test
+ public void testUpgradeToMaxLevel() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 1);
+ conf.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.FULL_COMPACTION);
+
+ FileStoreTable table = createFileStoreTable(conf);
+ TableWriteImpl<?> write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 1, 10L));
+ write.write(GenericRow.of(1, 2, 20L));
+ write.write(GenericRow.ofKind(RowKind.DELETE, 1, 2, 20L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.compact(partition(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(GenericRow.of(1, 2, 21L));
+ write.compact(partition(1), 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.close();
+ commit.close();
+
+ Map<String, String> readOptions = new HashMap<>();
+ readOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
+ table = table.copy(readOptions);
+ long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+
+ StreamTableScan scan = table.newStreamScan();
+ TableRead read = table.newRead();
+ assertThat(streamingRead(scan, read, latestSnapshotId)).hasSize(2);
+ }
+
+ private BinaryRow partition(int x) {
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(partition);
+ writer.writeInt(0, x);
+ writer.complete();
+ return partition;
+ }
+
+ List<InternalRow> streamingRead(
+ StreamTableScan scan, TableRead read, long numStreamingSnapshots)
throws Exception {
List<InternalRow> actual = new ArrayList<>();
- for (long i = 0; i <= latestSnapshotId; i++) {
+ for (long i = 0; i <= numStreamingSnapshots; i++) {
RecordReader<InternalRow> reader =
read.createReader(scan.plan().splits());
reader.forEachRemaining(actual::add);
reader.close();
}
-
- assertThat(actual).hasSize(numPartitions * numRecordsPerPartition);
+ return actual;
}
private FileStoreTable createFileStoreTable(Options conf) throws Exception
{
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index 663d0dad8..6d97eda5f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -50,7 +50,8 @@ public class SplitGeneratorTest {
minSequence,
maxSequence,
0,
- 0);
+ 0,
+ 0L);
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index 70f127291..c607e7a8f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -74,6 +74,7 @@ public class CompactionTaskSimpleSerializerTest {
0,
1,
0,
- 0);
+ 0,
+ 0L);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 5015e9219..4f53932a2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -111,7 +111,8 @@ public class FileStoreSourceSplitGeneratorTest {
0, // not used
0, // not used
0, // not used
- 0 // not used
+ 0, // not used
+ 0L // not used
));
}
return DataSplit.builder()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index d0cafbb95..cbe6cb86d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -84,7 +84,8 @@ public class FileStoreSourceSplitSerializerTest {
0,
1,
0,
- level);
+ level,
+ 0L);
}
public static FileStoreSourceSplit newSourceSplit(