This is an automated email from the ASF dual-hosted git repository. junhao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new e60487e849 [core] Make ManifestEntry and DataFileMeta to interfaces (#6134) e60487e849 is described below commit e60487e849a828af6d57bd5b89de74b0fef80bb2 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Mon Aug 25 17:09:56 2025 +0800 [core] Make ManifestEntry and DataFileMeta to interfaces (#6134) --- .../java/org/apache/paimon/io/DataFileMeta.java | 504 +++------------------ .../apache/paimon/io/DataFileMeta08Serializer.java | 2 +- .../apache/paimon/io/DataFileMeta09Serializer.java | 2 +- .../paimon/io/DataFileMeta10LegacySerializer.java | 2 +- .../paimon/io/DataFileMeta12LegacySerializer.java | 2 +- .../apache/paimon/io/DataFileMetaSerializer.java | 2 +- .../apache/paimon/io/KeyValueDataFileWriter.java | 2 +- .../{DataFileMeta.java => PojoDataFileMeta.java} | 298 +++--------- .../paimon/manifest/FilteredManifestEntry.java | 2 +- .../org/apache/paimon/manifest/ManifestEntry.java | 149 +----- .../paimon/manifest/ManifestEntrySerializer.java | 2 +- .../{ManifestEntry.java => PojoManifestEntry.java} | 93 ++-- .../paimon/operation/FileStoreCommitImpl.java | 4 +- .../sink/CommitMessageLegacyV2Serializer.java | 2 +- .../append/AppendCompactCoordinatorTest.java | 2 +- .../paimon/crosspartition/IndexBootstrapTest.java | 2 +- .../paimon/io/DataFileTestDataGenerator.java | 2 +- .../org/apache/paimon/io/DataFileTestUtils.java | 6 +- ...festCommittableSerializerCompatibilityTest.java | 18 +- .../ManifestCommittableSerializerTest.java | 2 +- .../paimon/manifest/ManifestFileMetaTestBase.java | 8 +- .../paimon/manifest/ManifestTestDataGenerator.java | 9 +- .../org/apache/paimon/mergetree/LevelsTest.java | 2 +- .../mergetree/compact/IntervalPartitionTest.java | 2 +- .../mergetree/compact/UniversalCompactionTest.java | 2 +- .../paimon/operation/ExpireSnapshotsTest.java | 12 +- .../apache/paimon/operation/FileDeletionTest.java | 2 +- .../source/DataEvolutionSplitGeneratorTest.java | 2 +- .../paimon/table/source/SplitGeneratorTest.java | 2 +- .../org/apache/paimon/table/source/SplitTest.java | 14 +- .../flink/copy/CopyManifestFileOperator.java | 2 +- .../ChangelogCompactSortOperatorTest.java | 2 +- .../ChangelogCompactTaskSerializerTest.java | 2 +- .../sink/CompactionTaskSimpleSerializerTest.java | 2 +- .../source/FileStoreSourceSplitGeneratorTest.java | 2 +- .../source/FileStoreSourceSplitSerializerTest.java | 2 +- 36 files changed, 233 insertions(+), 932 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 48b56a5c65..e7bb8b9571 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -35,17 +35,14 @@ import org.apache.paimon.types.TinyIntType; import javax.annotation.Nullable; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; -import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.SerializationUtils.newBytesType; import static org.apache.paimon.utils.SerializationUtils.newStringType; @@ -55,9 +52,9 @@ import static org.apache.paimon.utils.SerializationUtils.newStringType; * @since 0.9.0 */ @Public -public class DataFileMeta { +public interface DataFileMeta { - public static final RowType SCHEMA = + RowType SCHEMA = new RowType( false, Arrays.asList( @@ -87,51 +84,11 @@ public class DataFileMeta { new DataField( 19, "_WRITE_COLS", new ArrayType(true, newStringType(false))))); - public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW; - public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW; - public static final int DUMMY_LEVEL = 0; + BinaryRow EMPTY_MIN_KEY = EMPTY_ROW; + BinaryRow EMPTY_MAX_KEY = EMPTY_ROW; + int DUMMY_LEVEL = 0; - private final String fileName; - private final long fileSize; - - // total number of rows (including add & delete) in this file - private final long rowCount; - - private final BinaryRow minKey; - private final BinaryRow maxKey; - private final SimpleStats keyStats; - private final SimpleStats valueStats; - - // As for row-lineage table, this will be reassigned while committing - private final long minSequenceNumber; - private final long maxSequenceNumber; - private final long schemaId; - private final int level; - - private final List<String> extraFiles; - private final Timestamp creationTime; - - // rowCount = addRowCount + deleteRowCount - // Why don't we keep addRowCount and deleteRowCount? - // Because in previous versions of DataFileMeta, we only keep rowCount. - // We have to keep the compatibility. - private final @Nullable Long deleteRowCount; - - // file index filter bytes, if it is small, store in data file meta - private final @Nullable byte[] embeddedIndex; - - private final @Nullable FileSource fileSource; - - private final @Nullable List<String> valueStatsCols; - - /** external path of file, if it is null, it is in the default warehouse path. */ - private final @Nullable String externalPath; - - private final @Nullable Long firstRowId; - - private final @Nullable List<String> writeCols; - - public static DataFileMeta forAppend( + static DataFileMeta forAppend( String fileName, long fileSize, long rowCount, @@ -146,7 +103,7 @@ public class DataFileMeta { @Nullable String externalPath, @Nullable Long firstRowId, @Nullable List<String> writeCols) { - return new DataFileMeta( + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -169,7 +126,7 @@ public class DataFileMeta { writeCols); } - public DataFileMeta( + static DataFileMeta create( String fileName, long fileSize, long rowCount, @@ -189,7 +146,7 @@ public class DataFileMeta { @Nullable String externalPath, @Nullable Long firstRowId, @Nullable List<String> writeCols) { - this( + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -212,7 +169,7 @@ public class DataFileMeta { writeCols); } - public DataFileMeta( + static DataFileMeta create( String fileName, long fileSize, long rowCount, @@ -230,7 +187,7 @@ public class DataFileMeta { @Nullable List<String> valueStatsCols, @Nullable Long firstRowId, @Nullable List<String> writeCols) { - this( + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -253,7 +210,7 @@ public class DataFileMeta { writeCols); } - public DataFileMeta( + static DataFileMeta create( String fileName, long fileSize, long rowCount, @@ -274,429 +231,102 @@ public class DataFileMeta { @Nullable String externalPath, @Nullable Long firstRowId, @Nullable List<String> writeCols) { - this.fileName = fileName; - this.fileSize = fileSize; - - this.rowCount = rowCount; - - this.embeddedIndex = embeddedIndex; - this.minKey = minKey; - this.maxKey = maxKey; - this.keyStats = keyStats; - this.valueStats = valueStats; - - this.minSequenceNumber = minSequenceNumber; - this.maxSequenceNumber = maxSequenceNumber; - this.level = level; - this.schemaId = schemaId; - this.extraFiles = Collections.unmodifiableList(extraFiles); - this.creationTime = creationTime; - - this.deleteRowCount = deleteRowCount; - this.fileSource = fileSource; - this.valueStatsCols = valueStatsCols; - this.externalPath = externalPath; - this.firstRowId = firstRowId; - this.writeCols = writeCols; - } - - public String fileName() { - return fileName; + return new PojoDataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols, + externalPath, + firstRowId, + writeCols); } - public long fileSize() { - return fileSize; - } + String fileName(); - public long rowCount() { - return rowCount; - } + long fileSize(); - public Optional<Long> addRowCount() { - return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c); - } + long rowCount(); - public Optional<Long> deleteRowCount() { - return Optional.ofNullable(deleteRowCount); - } + Optional<Long> deleteRowCount(); - public byte[] embeddedIndex() { - return embeddedIndex; - } + byte[] embeddedIndex(); - public BinaryRow minKey() { - return minKey; - } + BinaryRow minKey(); - public BinaryRow maxKey() { - return maxKey; - } + BinaryRow maxKey(); - public SimpleStats keyStats() { - return keyStats; - } + SimpleStats keyStats(); - public SimpleStats valueStats() { - return valueStats; - } + SimpleStats valueStats(); - public long minSequenceNumber() { - return minSequenceNumber; - } + long minSequenceNumber(); - public long maxSequenceNumber() { - return maxSequenceNumber; - } + long maxSequenceNumber(); - public long schemaId() { - return schemaId; - } + long schemaId(); - public int level() { - return level; - } + int level(); - public List<String> extraFiles() { - return extraFiles; - } + List<String> extraFiles(); - public Timestamp creationTime() { - return creationTime; - } + Timestamp creationTime(); - public long creationTimeEpochMillis() { - return creationTime - .toLocalDateTime() - .atZone(ZoneId.systemDefault()) - .toInstant() - .toEpochMilli(); - } + long creationTimeEpochMillis(); - public String fileFormat() { - String[] split = fileName.split("\\."); - if (split.length == 1) { - throw new RuntimeException("Can't find format from file: " + fileName()); - } - return split[split.length - 1]; - } + String fileFormat(); - public Optional<String> externalPath() { - return Optional.ofNullable(externalPath); - } + Optional<String> externalPath(); - public Optional<String> externalPathDir() { - return Optional.ofNullable(externalPath) - .map(Path::new) - .map(p -> p.getParent().toUri().toString()); - } + Optional<String> externalPathDir(); - public Optional<FileSource> fileSource() { - return Optional.ofNullable(fileSource); - } + Optional<FileSource> fileSource(); @Nullable - public List<String> valueStatsCols() { - return valueStatsCols; - } + List<String> valueStatsCols(); @Nullable - public Long firstRowId() { - return firstRowId; - } + Long firstRowId(); @Nullable - public List<String> writeCols() { - return writeCols; - } + List<String> writeCols(); - public DataFileMeta upgrade(int newLevel) { - checkArgument(newLevel > this.level); - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - newLevel, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } + DataFileMeta upgrade(int newLevel); - public DataFileMeta rename(String newFileName) { - String newExternalPath = externalPathDir().map(dir -> dir + "/" + newFileName).orElse(null); - return new DataFileMeta( - newFileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - newExternalPath, - firstRowId, - writeCols); - } + DataFileMeta rename(String newFileName); - public DataFileMeta copyWithoutStats() { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - EMPTY_STATS, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - Collections.emptyList(), - externalPath, - firstRowId, - writeCols); - } + DataFileMeta copyWithoutStats(); - public DataFileMeta assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } + DataFileMeta assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber); - public DataFileMeta assignFirstRowId(long firstRowId) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } + DataFileMeta assignFirstRowId(long firstRowId); - public List<Path> collectFiles(DataFilePathFactory pathFactory) { + default List<Path> collectFiles(DataFilePathFactory pathFactory) { List<Path> paths = new ArrayList<>(); paths.add(pathFactory.toPath(this)); - extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this))); + extraFiles().forEach(f -> paths.add(pathFactory.toAlignedPath(f, this))); return paths; } - public DataFileMeta copy(List<String> newExtraFiles) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - newExtraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } + DataFileMeta copy(List<String> newExtraFiles); - public DataFileMeta newExternalPath(String newExternalPath) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - newExternalPath, - firstRowId, - writeCols); - } + DataFileMeta newExternalPath(String newExternalPath); - public DataFileMeta copy(byte[] newEmbeddedIndex) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - newEmbeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof DataFileMeta)) { - return false; - } - DataFileMeta that = (DataFileMeta) o; - return Objects.equals(fileName, that.fileName) - && fileSize == that.fileSize - && rowCount == that.rowCount - && Arrays.equals(embeddedIndex, that.embeddedIndex) - && Objects.equals(minKey, that.minKey) - && Objects.equals(maxKey, that.maxKey) - && Objects.equals(keyStats, that.keyStats) - && Objects.equals(valueStats, that.valueStats) - && minSequenceNumber == that.minSequenceNumber - && maxSequenceNumber == that.maxSequenceNumber - && schemaId == that.schemaId - && level == that.level - && Objects.equals(extraFiles, that.extraFiles) - && Objects.equals(creationTime, that.creationTime) - && Objects.equals(deleteRowCount, that.deleteRowCount) - && Objects.equals(fileSource, that.fileSource) - && Objects.equals(valueStatsCols, that.valueStatsCols) - && Objects.equals(externalPath, that.externalPath) - && Objects.equals(firstRowId, that.firstRowId) - && Objects.equals(writeCols, that.writeCols); - } - - @Override - public int hashCode() { - return Objects.hash( - fileName, - fileSize, - rowCount, - Arrays.hashCode(embeddedIndex), - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } - - @Override - public String toString() { - return String.format( - "{fileName: %s, fileSize: %d, rowCount: %d, embeddedIndex: %s, " - + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " - + "minSequenceNumber: %d, maxSequenceNumber: %d, " - + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, " - + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, externalPath: %s, firstRowId: %s, writeCols: %s}", - fileName, - fileSize, - rowCount, - Arrays.toString(embeddedIndex), - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } + DataFileMeta copy(byte[] newEmbeddedIndex); - public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) { + static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) { return fileMetas.stream() .map(DataFileMeta::maxSequenceNumber) .max(Long::compare) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java index 55105affb6..b646ef08ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java @@ -116,7 +116,7 @@ public class DataFileMeta08Serializer implements Serializable { byte[] bytes = new byte[in.readInt()]; in.readFully(bytes); SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); - return new DataFileMeta( + return DataFileMeta.create( row.getString(0).toString(), row.getLong(1), row.getLong(2), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java index c9cced68ea..662f1276c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java @@ -122,7 +122,7 @@ public class DataFileMeta09Serializer implements Serializable { byte[] bytes = new byte[in.readInt()]; in.readFully(bytes); SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); - return new DataFileMeta( + return DataFileMeta.create( row.getString(0).toString(), row.getLong(1), row.getLong(2), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java index 0fdaa9d409..dca1aa528f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java @@ -127,7 +127,7 @@ public class DataFileMeta10LegacySerializer implements Serializable { byte[] bytes = new byte[in.readInt()]; in.readFully(bytes); SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); - return new DataFileMeta( + return DataFileMeta.create( row.getString(0).toString(), row.getLong(1), row.getLong(2), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java index b6f9fe3aa3..e888c1ca74 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java @@ -129,7 +129,7 @@ public class DataFileMeta12LegacySerializer implements Serializable { byte[] bytes = new byte[in.readInt()]; in.readFully(bytes); SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); - return new DataFileMeta( + return DataFileMeta.create( row.getString(0).toString(), row.getLong(1), row.getLong(2), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index ed232d4431..afed7265d4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -66,7 +66,7 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> { @Override public DataFileMeta fromRow(InternalRow row) { - return new DataFileMeta( + return DataFileMeta.create( row.getString(0).toString(), row.getLong(1), row.getLong(2), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 7647668b0a..0710e28cea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -159,7 +159,7 @@ public abstract class KeyValueDataFileWriter : dataFileIndexWriter.result(); String externalPath = isExternalPath ? path.toString() : null; - return new DataFileMeta( + return DataFileMeta.create( path.getName(), fileSize, recordCount(), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java similarity index 59% copy from paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java copy to paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java index 48b56a5c65..6e858fbd22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java @@ -18,78 +18,26 @@ package org.apache.paimon.io; -import org.apache.paimon.annotation.Public; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.types.ArrayType; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowType; -import org.apache.paimon.types.TinyIntType; import javax.annotation.Nullable; -import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; -import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.SerializationUtils.newBytesType; -import static org.apache.paimon.utils.SerializationUtils.newStringType; -/** - * Metadata of a data file. - * - * @since 0.9.0 - */ -@Public -public class DataFileMeta { - - public static final RowType SCHEMA = - new RowType( - false, - Arrays.asList( - new DataField(0, "_FILE_NAME", newStringType(false)), - new DataField(1, "_FILE_SIZE", new BigIntType(false)), - new DataField(2, "_ROW_COUNT", new BigIntType(false)), - new DataField(3, "_MIN_KEY", newBytesType(false)), - new DataField(4, "_MAX_KEY", newBytesType(false)), - new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), - new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), - new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), - new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), - new DataField(9, "_SCHEMA_ID", new BigIntType(false)), - new DataField(10, "_LEVEL", new IntType(false)), - new DataField( - 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), - new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), - new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), - new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), - new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), - new DataField( - 16, - "_VALUE_STATS_COLS", - DataTypes.ARRAY(DataTypes.STRING().notNull())), - new DataField(17, "_EXTERNAL_PATH", newStringType(true)), - new DataField(18, "_FIRST_ROW_ID", new BigIntType(true)), - new DataField( - 19, "_WRITE_COLS", new ArrayType(true, newStringType(false))))); - - public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW; - public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW; - public static final int DUMMY_LEVEL = 0; +/** A {@link DataFileMeta} using pojo objects. */ +public class PojoDataFileMeta implements DataFileMeta { private final String fileName; private final long fileSize; @@ -131,129 +79,7 @@ public class DataFileMeta { private final @Nullable List<String> writeCols; - public static DataFileMeta forAppend( - String fileName, - long fileSize, - long rowCount, - SimpleStats rowStats, - long minSequenceNumber, - long maxSequenceNumber, - long schemaId, - List<String> extraFiles, - @Nullable byte[] embeddedIndex, - @Nullable FileSource fileSource, - @Nullable List<String> valueStatsCols, - @Nullable String externalPath, - @Nullable Long firstRowId, - @Nullable List<String> writeCols) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - EMPTY_MIN_KEY, - EMPTY_MAX_KEY, - EMPTY_STATS, - rowStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - DUMMY_LEVEL, - extraFiles, - Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), - 0L, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } - - public DataFileMeta( - String fileName, - long fileSize, - long rowCount, - BinaryRow minKey, - BinaryRow maxKey, - SimpleStats keyStats, - SimpleStats valueStats, - long minSequenceNumber, - long maxSequenceNumber, - long schemaId, - int level, - List<String> extraFiles, - @Nullable Long deleteRowCount, - @Nullable byte[] embeddedIndex, - @Nullable FileSource fileSource, - @Nullable List<String> valueStatsCols, - @Nullable String externalPath, - @Nullable Long firstRowId, - @Nullable List<String> writeCols) { - this( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath, - firstRowId, - writeCols); - } - - public DataFileMeta( - String fileName, - long fileSize, - long rowCount, - BinaryRow minKey, - BinaryRow maxKey, - SimpleStats keyStats, - SimpleStats valueStats, - long minSequenceNumber, - long maxSequenceNumber, - long schemaId, - int level, - @Nullable Long deleteRowCount, - @Nullable byte[] embeddedIndex, - @Nullable FileSource fileSource, - @Nullable List<String> valueStatsCols, - @Nullable Long firstRowId, - @Nullable List<String> writeCols) { - this( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - Collections.emptyList(), - Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - null, - firstRowId, - writeCols); - } - - public DataFileMeta( + public PojoDataFileMeta( String fileName, long fileSize, long rowCount, @@ -300,70 +126,82 @@ public class DataFileMeta { this.writeCols = writeCols; } + @Override public String fileName() { return fileName; } + @Override public long fileSize() { return fileSize; } + @Override public long rowCount() { return rowCount; } - public Optional<Long> addRowCount() { - return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c); - } - + @Override public Optional<Long> deleteRowCount() { return Optional.ofNullable(deleteRowCount); } + @Override public byte[] embeddedIndex() { return embeddedIndex; } + @Override public BinaryRow minKey() { return minKey; } + @Override public BinaryRow maxKey() { return maxKey; } + @Override public SimpleStats keyStats() { return keyStats; } + @Override public SimpleStats valueStats() { return valueStats; } + @Override public long minSequenceNumber() { return minSequenceNumber; } + @Override public long maxSequenceNumber() { return maxSequenceNumber; } + @Override public long schemaId() { return schemaId; } + @Override public int level() { return level; } + @Override public List<String> extraFiles() { return extraFiles; } + @Override public Timestamp creationTime() { return creationTime; } + @Override public long creationTimeEpochMillis() { return creationTime .toLocalDateTime() @@ -372,6 +210,7 @@ public class DataFileMeta { .toEpochMilli(); } + @Override public String fileFormat() { String[] split = fileName.split("\\."); if (split.length == 1) { @@ -380,16 +219,19 @@ public class DataFileMeta { return split[split.length - 1]; } + @Override public Optional<String> externalPath() { return Optional.ofNullable(externalPath); } + @Override public Optional<String> externalPathDir() { return Optional.ofNullable(externalPath) .map(Path::new) .map(p -> p.getParent().toUri().toString()); } + @Override public Optional<FileSource> fileSource() { return Optional.ofNullable(fileSource); } @@ -409,9 +251,10 @@ public class DataFileMeta { return writeCols; } - public DataFileMeta upgrade(int newLevel) { + @Override + public PojoDataFileMeta upgrade(int newLevel) { checkArgument(newLevel > this.level); - return new DataFileMeta( + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -434,9 +277,10 @@ public class DataFileMeta { writeCols); } - public DataFileMeta rename(String newFileName) { + @Override + public PojoDataFileMeta rename(String newFileName) { String newExternalPath = externalPathDir().map(dir -> dir + "/" + newFileName).orElse(null); - return new DataFileMeta( + return new PojoDataFileMeta( newFileName, fileSize, rowCount, @@ -459,8 +303,9 @@ public class DataFileMeta { writeCols); } - public DataFileMeta copyWithoutStats() { - return new DataFileMeta( + @Override + public PojoDataFileMeta copyWithoutStats() { + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -483,8 +328,9 @@ public class DataFileMeta { writeCols); } - public DataFileMeta assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber) { - return new DataFileMeta( + @Override + public PojoDataFileMeta assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber) { + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -507,8 +353,9 @@ public class DataFileMeta { writeCols); } - public DataFileMeta assignFirstRowId(long firstRowId) { - return new DataFileMeta( + @Override + public PojoDataFileMeta assignFirstRowId(long firstRowId) { + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -531,15 +378,9 @@ public class DataFileMeta { writeCols); } - public List<Path> collectFiles(DataFilePathFactory pathFactory) { - List<Path> paths = new ArrayList<>(); - paths.add(pathFactory.toPath(this)); - extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this))); - return paths; - } - - public DataFileMeta copy(List<String> newExtraFiles) { - return new DataFileMeta( + @Override + public PojoDataFileMeta copy(List<String> newExtraFiles) { + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -562,8 +403,9 @@ public class DataFileMeta { writeCols); } - public DataFileMeta newExternalPath(String newExternalPath) { - return new DataFileMeta( + @Override + public PojoDataFileMeta newExternalPath(String newExternalPath) { + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -586,8 +428,9 @@ public class DataFileMeta { writeCols); } - public DataFileMeta copy(byte[] newEmbeddedIndex) { - return new DataFileMeta( + @Override + public PojoDataFileMeta copy(byte[] newEmbeddedIndex) { + return new PojoDataFileMeta( fileName, fileSize, rowCount, @@ -619,26 +462,26 @@ public class DataFileMeta { return false; } DataFileMeta that = (DataFileMeta) o; - return Objects.equals(fileName, that.fileName) - && fileSize == that.fileSize - && rowCount == that.rowCount - && Arrays.equals(embeddedIndex, that.embeddedIndex) - && Objects.equals(minKey, that.minKey) - && Objects.equals(maxKey, that.maxKey) - && Objects.equals(keyStats, that.keyStats) - && Objects.equals(valueStats, that.valueStats) - && minSequenceNumber == that.minSequenceNumber - && maxSequenceNumber == that.maxSequenceNumber - && schemaId == that.schemaId - && level == that.level - && Objects.equals(extraFiles, that.extraFiles) - && Objects.equals(creationTime, that.creationTime) - && Objects.equals(deleteRowCount, that.deleteRowCount) - && Objects.equals(fileSource, that.fileSource) - && Objects.equals(valueStatsCols, that.valueStatsCols) - && Objects.equals(externalPath, that.externalPath) - && Objects.equals(firstRowId, that.firstRowId) - && Objects.equals(writeCols, that.writeCols); + return Objects.equals(fileName, that.fileName()) + && fileSize == that.fileSize() + && rowCount == that.rowCount() + && Arrays.equals(embeddedIndex, that.embeddedIndex()) + && Objects.equals(minKey, that.minKey()) + && Objects.equals(maxKey, that.maxKey()) + && Objects.equals(keyStats, that.keyStats()) + && Objects.equals(valueStats, that.valueStats()) + && minSequenceNumber == that.minSequenceNumber() + && maxSequenceNumber == that.maxSequenceNumber() + && schemaId == that.schemaId() + && level == that.level() + && Objects.equals(extraFiles, that.extraFiles()) + && Objects.equals(creationTime, that.creationTime()) + && Objects.equals(deleteRowCount, that.deleteRowCount().orElse(null)) + && Objects.equals(fileSource, that.fileSource().orElse(null)) + && Objects.equals(valueStatsCols, that.valueStatsCols()) + && Objects.equals(externalPath, that.externalPath().orElse(null)) + && Objects.equals(firstRowId, that.firstRowId()) + && Objects.equals(writeCols, that.writeCols()); } @Override @@ -695,11 +538,4 @@ public class DataFileMeta { firstRowId, writeCols); } - - public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) { - return fileMetas.stream() - .map(DataFileMeta::maxSequenceNumber) - .max(Long::compare) - .orElse(-1L); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java index 29ae6f6389..1255313b2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java @@ -19,7 +19,7 @@ package org.apache.paimon.manifest; /** Wrap a {@link ManifestEntry} to contain {@link #selected}. */ -public class FilteredManifestEntry extends ManifestEntry { +public class FilteredManifestEntry extends PojoManifestEntry { private final boolean selected; diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 0de753f166..5a99a7f270 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -26,12 +26,9 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Objects; import static org.apache.paimon.utils.SerializationUtils.newBytesType; @@ -41,9 +38,9 @@ import static org.apache.paimon.utils.SerializationUtils.newBytesType; * @since 0.9.0 */ @Public -public class ManifestEntry implements FileEntry { +public interface ManifestEntry extends FileEntry { - public static final RowType SCHEMA = + RowType SCHEMA = new RowType( false, Arrays.asList( @@ -53,158 +50,36 @@ public class ManifestEntry implements FileEntry { new DataField(3, "_TOTAL_BUCKETS", new IntType(false)), new DataField(4, "_FILE", DataFileMeta.SCHEMA))); - private final FileKind kind; - // for tables without partition this field should be a row with 0 columns (not null) - private final BinaryRow partition; - private final int bucket; - private final int totalBuckets; - private final DataFileMeta file; - - public ManifestEntry( + static ManifestEntry create( FileKind kind, BinaryRow partition, int bucket, int totalBuckets, DataFileMeta file) { - this.kind = kind; - this.partition = partition; - this.bucket = bucket; - this.totalBuckets = totalBuckets; - this.file = file; - } - - @Override - public FileKind kind() { - return kind; - } - - @Override - public BinaryRow partition() { - return partition; - } - - @Override - public int bucket() { - return bucket; - } - - @Override - public int level() { - return file.level(); + return new PojoManifestEntry(kind, partition, bucket, totalBuckets, file); } - @Override - public String fileName() { - return file.fileName(); - } - - @Nullable - @Override - public String externalPath() { - return file.externalPath().orElse(null); - } - - @Override - public BinaryRow minKey() { - return file.minKey(); - } + DataFileMeta file(); - @Override - public BinaryRow maxKey() { - return file.maxKey(); - } + ManifestEntry copyWithoutStats(); - @Override - public List<String> extraFiles() { - return file.extraFiles(); - } + ManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber); - @Override - public int totalBuckets() { - return totalBuckets; - } + ManifestEntry assignFirstRowId(long firstRowId); - public DataFileMeta file() { - return file; - } + byte[] toBytes() throws IOException; - @Override - public Identifier identifier() { - return new Identifier( - partition, - bucket, - file.level(), - file.fileName(), - file.extraFiles(), - file.embeddedIndex(), - externalPath()); - } - - public ManifestEntry copyWithoutStats() { - return new ManifestEntry(kind, partition, bucket, totalBuckets, file.copyWithoutStats()); - } - - public ManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber) { - return new ManifestEntry( - kind, - partition, - bucket, - totalBuckets, - file.assignSequenceNumber(minSequenceNumber, maxSequenceNumber)); - } - - public ManifestEntry assignFirstRowId(long firstRowId) { - return new ManifestEntry( - kind, partition, bucket, totalBuckets, file.assignFirstRowId(firstRowId)); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ManifestEntry)) { - return false; - } - ManifestEntry that = (ManifestEntry) o; - return Objects.equals(kind, that.kind) - && Objects.equals(partition, that.partition) - && bucket == that.bucket - && totalBuckets == that.totalBuckets - && Objects.equals(file, that.file); - } - - @Override - public int hashCode() { - return Objects.hash(kind, partition, bucket, totalBuckets, file); - } - - @Override - public String toString() { - return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file); - } - - public static long recordCount(List<ManifestEntry> manifestEntries) { + static long recordCount(List<ManifestEntry> manifestEntries) { return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); } - public static long recordCountAdd(List<ManifestEntry> manifestEntries) { + static long recordCountAdd(List<ManifestEntry> manifestEntries) { return manifestEntries.stream() .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) .mapToLong(manifest -> manifest.file().rowCount()) .sum(); } - public static long recordCountDelete(List<ManifestEntry> manifestEntries) { + static long recordCountDelete(List<ManifestEntry> manifestEntries) { return manifestEntries.stream() .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) .mapToLong(manifest -> manifest.file().rowCount()) .sum(); } - - // ----------------------- Serialization ----------------------------- - - private static final ThreadLocal<ManifestEntrySerializer> SERIALIZER_THREAD_LOCAL = - ThreadLocal.withInitial(ManifestEntrySerializer::new); - - public byte[] toBytes() throws IOException { - return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this); - } - - public ManifestEntry fromBytes(byte[] bytes) throws IOException { - return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index b1030448a7..c31d79713e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -69,7 +69,7 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE } throw new IllegalArgumentException("Unsupported version: " + version); } - return new ManifestEntry( + return ManifestEntry.create( FileKind.fromByteValue(row.getByte(0)), deserializeBinaryRow(row.getBinary(1)), row.getInt(2), diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java similarity index 59% copy from paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java copy to paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java index 0de753f166..b3741a3c55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java @@ -18,40 +18,20 @@ package org.apache.paimon.manifest; -import org.apache.paimon.annotation.Public; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowType; -import org.apache.paimon.types.TinyIntType; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Objects; -import static org.apache.paimon.utils.SerializationUtils.newBytesType; +/** A {@link ManifestEntry} using pojo objects. */ +public class PojoManifestEntry implements ManifestEntry { -/** - * Entry of a manifest file, representing an addition / deletion of a data file. - * - * @since 0.9.0 - */ -@Public -public class ManifestEntry implements FileEntry { - - public static final RowType SCHEMA = - new RowType( - false, - Arrays.asList( - new DataField(0, "_KIND", new TinyIntType(false)), - new DataField(1, "_PARTITION", newBytesType(false)), - new DataField(2, "_BUCKET", new IntType(false)), - new DataField(3, "_TOTAL_BUCKETS", new IntType(false)), - new DataField(4, "_FILE", DataFileMeta.SCHEMA))); + private static final ThreadLocal<ManifestEntrySerializer> SERIALIZER_THREAD_LOCAL = + ThreadLocal.withInitial(ManifestEntrySerializer::new); private final FileKind kind; // for tables without partition this field should be a row with 0 columns (not null) @@ -60,7 +40,7 @@ public class ManifestEntry implements FileEntry { private final int totalBuckets; private final DataFileMeta file; - public ManifestEntry( + public PojoManifestEntry( FileKind kind, BinaryRow partition, int bucket, int totalBuckets, DataFileMeta file) { this.kind = kind; this.partition = partition; @@ -120,6 +100,7 @@ public class ManifestEntry implements FileEntry { return totalBuckets; } + @Override public DataFileMeta file() { return file; } @@ -136,12 +117,15 @@ public class ManifestEntry implements FileEntry { externalPath()); } - public ManifestEntry copyWithoutStats() { - return new ManifestEntry(kind, partition, bucket, totalBuckets, file.copyWithoutStats()); + @Override + public PojoManifestEntry copyWithoutStats() { + return new PojoManifestEntry( + kind, partition, bucket, totalBuckets, file.copyWithoutStats()); } - public ManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber) { - return new ManifestEntry( + @Override + public PojoManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber) { + return new PojoManifestEntry( kind, partition, bucket, @@ -149,22 +133,28 @@ public class ManifestEntry implements FileEntry { file.assignSequenceNumber(minSequenceNumber, maxSequenceNumber)); } - public ManifestEntry assignFirstRowId(long firstRowId) { - return new ManifestEntry( + @Override + public PojoManifestEntry assignFirstRowId(long firstRowId) { + return new PojoManifestEntry( kind, partition, bucket, totalBuckets, file.assignFirstRowId(firstRowId)); } + @Override + public byte[] toBytes() throws IOException { + return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this); + } + @Override public boolean equals(Object o) { if (!(o instanceof ManifestEntry)) { return false; } ManifestEntry that = (ManifestEntry) o; - return Objects.equals(kind, that.kind) - && Objects.equals(partition, that.partition) - && bucket == that.bucket - && totalBuckets == that.totalBuckets - && Objects.equals(file, that.file); + return Objects.equals(kind, that.kind()) + && Objects.equals(partition, that.partition()) + && bucket == that.bucket() + && totalBuckets == that.totalBuckets() + && Objects.equals(file, that.file()); } @Override @@ -176,35 +166,4 @@ public class ManifestEntry implements FileEntry { public String toString() { return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file); } - - public static long recordCount(List<ManifestEntry> manifestEntries) { - return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); - } - - public static long recordCountAdd(List<ManifestEntry> manifestEntries) { - return manifestEntries.stream() - .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) - .mapToLong(manifest -> manifest.file().rowCount()) - .sum(); - } - - public static long recordCountDelete(List<ManifestEntry> manifestEntries) { - return manifestEntries.stream() - .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) - .mapToLong(manifest -> manifest.file().rowCount()) - .sum(); - } - - // ----------------------- Serialization ----------------------------- - - private static final ThreadLocal<ManifestEntrySerializer> SERIALIZER_THREAD_LOCAL = - ThreadLocal.withInitial(ManifestEntrySerializer::new); - - public byte[] toBytes() throws IOException { - return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this); - } - - public ManifestEntry fromBytes(byte[] bytes) throws IOException { - return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 34d2f7cac5..1538bfde50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -774,7 +774,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { totalBuckets = numBucket; } - return new ManifestEntry( + return ManifestEntry.create( kind, commitMessage.partition(), commitMessage.bucket(), totalBuckets, file); } @@ -853,7 +853,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { List<ManifestEntry> currentEntries = scan.plan().files(); for (ManifestEntry entry : currentEntries) { changesWithOverwrite.add( - new ManifestEntry( + ManifestEntry.create( FileKind.DELETE, entry.partition(), entry.bucket(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java index 75b969b865..e3ebbf1ad1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java @@ -139,7 +139,7 @@ public class CommitMessageLegacyV2Serializer { @Override public DataFileMeta fromRow(InternalRow row) { - return new DataFileMeta( + return DataFileMeta.create( row.getString(0).toString(), row.getLong(1), row.getLong(2), diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java index eb9468abbe..cd1ab9094d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java @@ -245,7 +245,7 @@ public class AppendCompactCoordinatorTest { } private DataFileMeta newFile(long fileSize) { - return new DataFileMeta( + return DataFileMeta.create( UUID.randomUUID().toString(), fileSize, 100, diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index deb21eade4..526b6bd29f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -140,7 +140,7 @@ public class IndexBootstrapTest extends TableTestBase { } private static DataFileMeta newFile(long timeMillis) { - return new DataFileMeta( + return DataFileMeta.create( "", 1, 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index 3c4c74301e..3dbe87b9ff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -150,7 +150,7 @@ public class DataFileTestDataGenerator { return new Data( partition, bucket, - new DataFileMeta( + DataFileMeta.create( "data-" + UUID.randomUUID(), totalSize, kvs.size(), diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index b19b535ea6..bdf0c7c78e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -40,7 +40,7 @@ public class DataFileTestUtils { } public static DataFileMeta newFile(long minSeq, long maxSeq) { - return new DataFileMeta( + return DataFileMeta.create( "", maxSeq - minSeq + 1, 0L, @@ -64,7 +64,7 @@ public class DataFileTestUtils { } public static DataFileMeta newFile() { - return new DataFileMeta( + return DataFileMeta.create( "", 0, 0, @@ -91,7 +91,7 @@ public class DataFileTestUtils { public static DataFileMeta newFile( String name, int level, int minKey, int maxKey, long maxSequence, Long deleteRowCount) { - return new DataFileMeta( + return DataFileMeta.create( name, maxKey - minKey + 1, maxKey - minKey + 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 0cf635ead6..82ff4d1be1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -58,7 +58,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -135,7 +135,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -211,7 +211,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -285,7 +285,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -359,7 +359,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -432,7 +432,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -506,7 +506,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -580,7 +580,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -654,7 +654,7 @@ public class ManifestCommittableSerializerCompatibilityTest { singleColumn("max_value"), fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index c7cfe4f9c8..4ff378a5ac 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -101,7 +101,7 @@ public class ManifestCommittableSerializerTest { } public static DataFileMeta newFile(int name, int level) { - return new DataFileMeta( + return DataFileMeta.create( String.valueOf(name), 0, 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 50258bc8cc..5cac5e65e2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -73,12 +73,12 @@ public abstract class ManifestFileMetaTestBase { binaryRow = BinaryRow.EMPTY_ROW; } - return new ManifestEntry( + return ManifestEntry.create( isAdd ? FileKind.ADD : FileKind.DELETE, binaryRow, 0, // not used 0, // not used - new DataFileMeta( + DataFileMeta.create( fileName, 0, // not used 0, // not used @@ -262,12 +262,12 @@ public abstract class ManifestFileMetaTestBase { public static ManifestEntry makeEntry( FileKind fileKind, int partition, int bucket, long rowCount) { - return new ManifestEntry( + return ManifestEntry.create( fileKind, row(partition), bucket, 0, // not used - new DataFileMeta( + DataFileMeta.create( "", // not used 0, // not used rowCount, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java index 959d3f9e3e..8e09c1bb91 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java @@ -74,7 +74,7 @@ public class ManifestTestDataGenerator { List<DataFileTestDataGenerator.Data> level = bucketLevels.get(file.meta.level()); level.add(file); bufferedResults.push( - new ManifestEntry( + ManifestEntry.create( FileKind.ADD, file.partition, file.bucket, numBuckets, file.meta)); mergeLevelsIfNeeded(file.partition, file.bucket); @@ -144,7 +144,7 @@ public class ManifestTestDataGenerator { for (DataFileTestDataGenerator.Data file : currentLevel) { bufferedResults.push( - new ManifestEntry( + ManifestEntry.create( FileKind.DELETE, partition, bucket, numBuckets, file.meta)); kvs.addAll(file.content); } @@ -152,7 +152,7 @@ public class ManifestTestDataGenerator { for (DataFileTestDataGenerator.Data file : nextLevel) { bufferedResults.push( - new ManifestEntry( + ManifestEntry.create( FileKind.DELETE, partition, bucket, numBuckets, file.meta)); kvs.addAll(file.content); } @@ -164,7 +164,8 @@ public class ManifestTestDataGenerator { nextLevel.addAll(merged); for (DataFileTestDataGenerator.Data file : nextLevel) { bufferedResults.push( - new ManifestEntry(FileKind.ADD, partition, bucket, numBuckets, file.meta)); + ManifestEntry.create( + FileKind.ADD, partition, bucket, numBuckets, file.meta)); } lastModifiedLevel += 1; diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java index a76bfca959..b3c7f0275a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java @@ -69,7 +69,7 @@ public class LevelsTest { } public static DataFileMeta newFile(int level) { - return new DataFileMeta( + return DataFileMeta.create( UUID.randomUUID().toString(), 0, 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index fbd046b7e2..464b26b944 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -167,7 +167,7 @@ public class IntervalPartitionTest { maxWriter.writeInt(0, right); maxWriter.complete(); - return new DataFileMeta( + return DataFileMeta.create( "DUMMY", 100, 25, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java index 53bfbd0f18..37e057c704 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java @@ -442,7 +442,7 @@ public class UniversalCompactionTest { } static DataFileMeta file(long size) { - return new DataFileMeta( + return DataFileMeta.create( "", size, 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 52afe00a31..d63af5fd2c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -266,7 +266,7 @@ public class ExpireSnapshotsTest { // create DataFileMeta and ManifestEntry List<String> extraFiles = Arrays.asList("extra1", "extra2"); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "myDataFile", 1, 1, @@ -287,8 +287,8 @@ public class ExpireSnapshotsTest { null, null, null); - ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); - ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); + ManifestEntry add = ManifestEntry.create(FileKind.ADD, partition, 0, 1, dataFile); + ManifestEntry delete = ManifestEntry.create(FileKind.DELETE, partition, 0, 1, dataFile); // expire expire.snapshotDeletion() @@ -329,7 +329,7 @@ public class ExpireSnapshotsTest { // create DataFileMeta and ManifestEntry List<String> extraFiles = Arrays.asList("extra1", "extra2"); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( fileName, 1, 1, @@ -350,8 +350,8 @@ public class ExpireSnapshotsTest { myDataFile.toString(), null, null); - ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); - ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); + ManifestEntry add = ManifestEntry.create(FileKind.ADD, partition, 0, 1, dataFile); + ManifestEntry delete = ManifestEntry.create(FileKind.DELETE, partition, 0, 1, dataFile); // expire expire.snapshotDeletion() diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index b369beff43..8e794fe749 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -912,7 +912,7 @@ public class FileDeletionTest { bucketEntries.stream() .map( entry -> - new ManifestEntry( + ManifestEntry.create( FileKind.DELETE, partition, bucket, diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java index 9d9f9354db..27e7841f9d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java @@ -37,7 +37,7 @@ public class DataEvolutionSplitGeneratorTest { private static DataFileMeta createFile( String name, @Nullable Long firstRowId, long maxSequence) { - return new DataFileMeta( + return DataFileMeta.create( name, 10000L, 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index f3a2b3b47d..379207acdc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -44,7 +44,7 @@ public class SplitGeneratorTest { public static DataFileMeta newFileFromSequence( String name, int fileSize, long minSequence, long maxSequence) { - return new DataFileMeta( + return DataFileMeta.create( name, fileSize, 1, diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index ad20c4b699..6eb11583f4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -198,7 +198,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -263,7 +263,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -328,7 +328,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -397,7 +397,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -466,7 +466,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -535,7 +535,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, @@ -605,7 +605,7 @@ public class SplitTest { fromLongArray(new Long[] {0L})); DataFileMeta dataFile = - new DataFileMeta( + DataFileMeta.create( "my_file", 1024 * 1024, 1024, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java index 40ba6fc400..1abcccd0bb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java @@ -150,7 +150,7 @@ public class CopyManifestFileOperator extends AbstractStreamOperator<CopyFileInf // path is null for (ManifestEntry manifestEntry : manifestEntries) { ManifestEntry newManifestEntry = - new ManifestEntry( + ManifestEntry.create( manifestEntry.kind(), manifestEntry.partition(), manifestEntry.bucket(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java index 40b122f9d2..baedafcf8a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java @@ -180,7 +180,7 @@ public class ChangelogCompactSortOperatorTest { } private DataFileMeta createDataFileMeta(int mb, long creationMillis) { - return new DataFileMeta( + return DataFileMeta.create( UUID.randomUUID().toString(), MemorySize.ofMebiBytes(mb).getBytes(), 0, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java index 840fa21ed6..344200d043 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java @@ -86,7 +86,7 @@ public class ChangelogCompactTaskSerializerTest { } private DataFileMeta newFile() { - return new DataFileMeta( + return DataFileMeta.create( UUID.randomUUID().toString(), 0, 1, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java index 84a6f9fdb0..7c22f125ff 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java @@ -64,7 +64,7 @@ public class CompactionTaskSimpleSerializerTest { } private DataFileMeta newFile() { - return new DataFileMeta( + return DataFileMeta.create( UUID.randomUUID().toString(), 0, 1, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index 9a4bf070e0..932b7ee951 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -101,7 +101,7 @@ public class FileStoreSourceSplitGeneratorTest { List<DataFileMeta> metas = new ArrayList<>(); for (String fileName : fileNames) { metas.add( - new DataFileMeta( + DataFileMeta.create( fileName, 0, // not used 0, // not used diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java index 4b721de319..5514fa4ce2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java @@ -74,7 +74,7 @@ public class FileStoreSourceSplitSerializerTest { // ------------------------------------------------------------------------ public static DataFileMeta newFile(int level) { - return new DataFileMeta( + return DataFileMeta.create( "", 0, 1,