This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2c38b03cc [core] Introduce FileSource to DataFileMeta (#3363)
2c38b03cc is described below
commit 2c38b03cc191168273209163fc2c1585b6713afc
Author: Aitozi <[email protected]>
AuthorDate: Wed May 22 17:57:42 2024 +0800
[core] Introduce FileSource to DataFileMeta (#3363)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 4 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 49 +++++++++++++++------
.../apache/paimon/io/DataFileMetaSerializer.java | 7 ++-
.../apache/paimon/io/KeyValueDataFileWriter.java | 9 +++-
.../paimon/io/KeyValueFileWriterFactory.java | 18 +++++---
.../org/apache/paimon/io/RowDataFileWriter.java | 9 +++-
.../apache/paimon/io/RowDataRollingFileWriter.java | 7 ++-
.../org/apache/paimon/manifest/FileSource.java | 51 ++++++++++++++++++++++
.../apache/paimon/mergetree/MergeTreeWriter.java | 3 +-
.../compact/ChangelogMergeTreeRewriter.java | 5 ++-
.../compact/MergeTreeCompactRewriter.java | 3 +-
.../org/apache/paimon/migrate/FileMetaUtils.java | 4 +-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 4 +-
.../AppendOnlyTableCompactionCoordinatorTest.java | 4 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 4 +-
.../paimon/crosspartition/IndexBootstrapTest.java | 4 +-
.../paimon/io/DataFileTestDataGenerator.java | 4 +-
.../org/apache/paimon/io/DataFileTestUtils.java | 10 +++--
.../paimon/io/KeyValueFileReadWriteTest.java | 12 ++---
.../apache/paimon/io/RollingFileWriterTest.java | 4 +-
.../ManifestCommittableSerializerTest.java | 3 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 7 +--
.../paimon/mergetree/ContainsLevelsTest.java | 3 +-
.../org/apache/paimon/mergetree/LevelsTest.java | 4 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 3 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 3 +-
.../mergetree/compact/IntervalPartitionTest.java | 4 +-
.../mergetree/compact/UniversalCompactionTest.java | 4 +-
.../paimon/operation/ExpireSnapshotsTest.java | 4 +-
.../paimon/table/source/SplitGeneratorTest.java | 4 +-
.../sink/CompactionTaskSimpleSerializerTest.java | 4 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 5 ++-
.../source/FileStoreSourceSplitSerializerTest.java | 4 +-
.../org/apache/paimon/spark/ScanHelperTest.scala | 5 ++-
34 files changed, 209 insertions(+), 63 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index e26d302da..c5a47ff54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -256,7 +257,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
seqNumCounter,
fileCompression,
statsCollectors,
- fileIndexOptions);
+ fileIndexOptions,
+ FileSource.APPEND);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index d62bb2d15..bfdfaf249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.ArrayType;
@@ -30,6 +31,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
import javax.annotation.Nullable;
@@ -83,6 +85,8 @@ public class DataFileMeta {
// file index filter bytes, if it is small, store in data file meta
private final @Nullable byte[] embeddedIndex;
+ private final @Nullable FileSource fileSource;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -90,7 +94,8 @@ public class DataFileMeta {
SimpleStats rowStats,
long minSequenceNumber,
long maxSequenceNumber,
- long schemaId) {
+ long schemaId,
+ @Nullable FileSource fileSource) {
return forAppend(
fileName,
fileSize,
@@ -100,7 +105,8 @@ public class DataFileMeta {
maxSequenceNumber,
schemaId,
Collections.emptyList(),
- null);
+ null,
+ fileSource);
}
public static DataFileMeta forAppend(
@@ -112,7 +118,8 @@ public class DataFileMeta {
long maxSequenceNumber,
long schemaId,
List<String> extraFiles,
- @Nullable byte[] embeddedIndex) {
+ @Nullable byte[] embeddedIndex,
+ @Nullable FileSource fileSource) {
return new DataFileMeta(
fileName,
fileSize,
@@ -128,7 +135,8 @@ public class DataFileMeta {
extraFiles,
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
0L,
- embeddedIndex);
+ embeddedIndex,
+ fileSource);
}
public DataFileMeta(
@@ -144,7 +152,8 @@ public class DataFileMeta {
long schemaId,
int level,
@Nullable Long deleteRowCount,
- @Nullable byte[] embeddedIndex) {
+ @Nullable byte[] embeddedIndex,
+ @Nullable FileSource fileSource) {
this(
fileName,
fileSize,
@@ -160,7 +169,8 @@ public class DataFileMeta {
Collections.emptyList(),
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
deleteRowCount,
- embeddedIndex);
+ embeddedIndex,
+ fileSource);
}
public DataFileMeta(
@@ -178,7 +188,8 @@ public class DataFileMeta {
List<String> extraFiles,
Timestamp creationTime,
@Nullable Long deleteRowCount,
- @Nullable byte[] embeddedIndex) {
+ @Nullable byte[] embeddedIndex,
+ @Nullable FileSource fileSource) {
this.fileName = fileName;
this.fileSize = fileSize;
@@ -198,6 +209,7 @@ public class DataFileMeta {
this.creationTime = creationTime;
this.deleteRowCount = deleteRowCount;
+ this.fileSource = fileSource;
}
public String fileName() {
@@ -291,6 +303,10 @@ public class DataFileMeta {
return split[split.length - 1];
}
+ public Optional<FileSource> fileSource() {
+ return Optional.ofNullable(fileSource);
+ }
+
public DataFileMeta upgrade(int newLevel) {
checkArgument(newLevel > this.level);
return new DataFileMeta(
@@ -308,7 +324,8 @@ public class DataFileMeta {
extraFiles,
creationTime,
deleteRowCount,
- embeddedIndex);
+ embeddedIndex,
+ fileSource);
}
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -334,7 +351,8 @@ public class DataFileMeta {
newExtraFiles,
creationTime,
deleteRowCount,
- embeddedIndex);
+ embeddedIndex,
+ fileSource);
}
@Override
@@ -360,7 +378,8 @@ public class DataFileMeta {
&& level == that.level
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(creationTime, that.creationTime)
- && Objects.equals(deleteRowCount, that.deleteRowCount);
+ && Objects.equals(deleteRowCount, that.deleteRowCount)
+ && Objects.equals(fileSource, that.fileSource);
}
@Override
@@ -380,7 +399,8 @@ public class DataFileMeta {
level,
extraFiles,
creationTime,
- deleteRowCount);
+ deleteRowCount,
+ fileSource);
}
@Override
@@ -389,7 +409,8 @@ public class DataFileMeta {
"{fileName: %s, fileSize: %d, rowCount: %d, embeddedIndex: %s,
"
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats:
%s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
- + "schemaId: %d, level: %d, extraFiles: %s,
creationTime: %s, deleteRowCount: %d}",
+ + "schemaId: %d, level: %d, extraFiles: %s,
creationTime: %s, "
+ + "deleteRowCount: %d, fileSource: %s}",
fileName,
fileSize,
rowCount,
@@ -404,7 +425,8 @@ public class DataFileMeta {
level,
extraFiles,
creationTime,
- deleteRowCount);
+ deleteRowCount,
+ fileSource);
}
public static RowType schema() {
@@ -424,6 +446,7 @@ public class DataFileMeta {
fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)));
fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)));
+ fields.add(new DataField(15, "_FILE_SOURCE", new TinyIntType(true)));
return new RowType(fields);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index 68707a6c4..78891d064 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.utils.ObjectSerializer;
@@ -55,7 +56,8 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
toStringArrayData(meta.extraFiles()),
meta.creationTime(),
meta.deleteRowCount().orElse(null),
- meta.embeddedIndex());
+ meta.embeddedIndex(),
+ meta.fileSource().map(FileSource::toByteValue).orElse(null));
}
@Override
@@ -75,6 +77,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
row.isNullAt(13) ? null : row.getLong(13),
- row.isNullAt(14) ? null : row.getBinary(14));
+ row.isNullAt(14) ? null : row.getBinary(14),
+ row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index bfea680fa..46db1ddd1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.RowType;
@@ -62,6 +63,7 @@ public class KeyValueDataFileWriter
private final SimpleStatsConverter keyStatsConverter;
private final SimpleStatsConverter valueStatsConverter;
private final InternalRowSerializer keySerializer;
+ private final FileSource fileSource;
private BinaryRow minKey = null;
private InternalRow maxKey = null;
@@ -80,7 +82,8 @@ public class KeyValueDataFileWriter
long schemaId,
int level,
String compression,
- CoreOptions options) {
+ CoreOptions options,
+ FileSource fileSource) {
super(
fileIO,
factory,
@@ -100,6 +103,7 @@ public class KeyValueDataFileWriter
this.keyStatsConverter = new SimpleStatsConverter(keyType);
this.valueStatsConverter = new SimpleStatsConverter(valueType);
this.keySerializer = new InternalRowSerializer(keyType);
+ this.fileSource = fileSource;
}
@Override
@@ -170,6 +174,7 @@ public class KeyValueDataFileWriter
level,
deleteRecordCount,
// TODO: enable file filter for primary key table (e.g.
deletion table).
- null);
+ null,
+ fileSource);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 0d3916148..72f9b3f65 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -81,9 +82,12 @@ public class KeyValueFileWriterFactory {
return formatContext.pathFactory(level);
}
- public RollingFileWriter<KeyValue, DataFileMeta>
createRollingMergeTreeFileWriter(int level) {
+ public RollingFileWriter<KeyValue, DataFileMeta>
createRollingMergeTreeFileWriter(
+ int level, FileSource fileSource) {
return new RollingFileWriter<>(
- () ->
createDataFileWriter(formatContext.pathFactory(level).newPath(), level),
+ () ->
+ createDataFileWriter(
+ formatContext.pathFactory(level).newPath(),
level, fileSource),
suggestedFileSize);
}
@@ -91,11 +95,14 @@ public class KeyValueFileWriterFactory {
return new RollingFileWriter<>(
() ->
createDataFileWriter(
-
formatContext.pathFactory(level).newChangelogPath(), level),
+
formatContext.pathFactory(level).newChangelogPath(),
+ level,
+ FileSource.APPEND),
suggestedFileSize);
}
- private KeyValueDataFileWriter createDataFileWriter(Path path, int level) {
+ private KeyValueDataFileWriter createDataFileWriter(
+ Path path, int level, FileSource fileSource) {
KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType,
valueType);
return new KeyValueDataFileWriter(
fileIO,
@@ -108,7 +115,8 @@ public class KeyValueFileWriterFactory {
schemaId,
level,
formatContext.compression(level),
- options);
+ options,
+ fileSource);
}
public void deleteFile(String filename, int level) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index e0521ca79..3da909817 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
@@ -48,6 +49,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
private final LongCounter seqNumCounter;
private final SimpleStatsConverter statsArraySerializer;
@Nullable private final FileIndexWriter fileIndexWriter;
+ private final FileSource fileSource;
public RowDataFileWriter(
FileIO fileIO,
@@ -59,7 +61,8 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
LongCounter seqNumCounter,
String fileCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
- FileIndexOptions fileIndexOptions) {
+ FileIndexOptions fileIndexOptions,
+ FileSource fileSource) {
super(
fileIO,
factory,
@@ -75,6 +78,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
this.fileIndexWriter =
FileIndexWriter.create(
fileIO, toFileIndexPath(path), writeSchema,
fileIndexOptions);
+ this.fileSource = fileSource;
}
@Override
@@ -111,6 +115,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
indexResult.independentIndexFile() == null
? Collections.emptyList()
:
Collections.singletonList(indexResult.independentIndexFile()),
- indexResult.embeddedIndexBytes());
+ indexResult.embeddedIndexBytes(),
+ fileSource);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index d6f457795..9827c8f7c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.avro.AvroFileFormat;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
@@ -40,7 +41,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
LongCounter seqNumCounter,
String fileCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
- FileIndexOptions fileIndexOptions) {
+ FileIndexOptions fileIndexOptions,
+ FileSource fileSource) {
super(
() ->
new RowDataFileWriter(
@@ -57,7 +59,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
seqNumCounter,
fileCompression,
statsCollectors,
- fileIndexOptions),
+ fileIndexOptions,
+ fileSource),
targetFileSize);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java
new file mode 100644
index 000000000..c62aea2d6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+/** The Source of a file. */
+public enum FileSource {
+
+ /** The file from new input. */
+ APPEND((byte) 0),
+
+ /** The file from compaction. */
+ COMPACT((byte) 1);
+
+ private final byte value;
+
+ FileSource(byte value) {
+ this.value = value;
+ }
+
+ public byte toByteValue() {
+ return value;
+ }
+
+ public static FileSource fromByteValue(byte value) {
+ switch (value) {
+ case 0:
+ return APPEND;
+ case 1:
+ return COMPACT;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported byte value '" + value + "' for value
kind.");
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 6becbf731..a42952914 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
@@ -212,7 +213,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
? writerFactory.createRollingChangelogFileWriter(0)
: null;
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
- writerFactory.createRollingMergeTreeFileWriter(0);
+ writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
try {
writeBuffer.forEach(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index a1aabc4e7..a03d53329 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.CloseableIterator;
@@ -128,7 +129,9 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
readerForMergeTree(sections,
createMergeWrapper(outputLevel))
.toCloseableIterator();
if (rewriteCompactFile) {
- compactFileWriter =
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+ compactFileWriter =
+ writerFactory.createRollingMergeTreeFileWriter(
+ outputLevel, FileSource.COMPACT);
}
if (produceChangelog) {
changelogFileWriter =
writerFactory.createRollingChangelogFileWriter(outputLevel);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index d399d599d..89e82fe3f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.mergetree.DropDeleteReader;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
@@ -73,7 +74,7 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
protected CompactResult rewriteCompaction(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) throws Exception {
RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+ writerFactory.createRollingMergeTreeFileWriter(outputLevel,
FileSource.COMPACT);
RecordReader<KeyValue> reader =
readerForMergeTree(sections, new
ReducerMergeFunctionWrapper(mfFactory.create()));
if (dropDelete) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 1ffc0cb5d..7c84c87d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
@@ -164,7 +165,8 @@ public class FileMetaUtils {
stats,
0,
0,
- ((FileStoreTable) table).schema().id());
+ ((FileStoreTable) table).schema().id(),
+ FileSource.APPEND);
}
public static BinaryRow writePartitionValue(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 560648d41..64cbb5fd7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -33,6 +33,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -177,7 +178,8 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
new
LongCounter(toCompact.get(0).minSequenceNumber()),
fileCompression,
statsCollectors,
- fileIndexOptions);
+ fileIndexOptions,
+ FileSource.COMPACT);
try {
rewriter.write(bucketReader(partition,
bucket).read(toCompact));
} finally {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index ec4212f16..bd0c7da29 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -187,6 +188,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
0,
0,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 6d0c59754..b5cc55f9c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.MemorySize;
@@ -646,6 +647,7 @@ public class AppendOnlyWriterTest {
}),
minSeq,
maxSeq,
- toCompact.get(0).schemaId());
+ toCompact.get(0).schemaId(),
+ FileSource.APPEND);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 19541f607..bbb1abfd3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
@@ -141,7 +142,8 @@ public class IndexBootstrapTest extends TableTestBase {
.atZone(ZoneId.systemDefault())
.toLocalDateTime()),
0L,
- null);
+ null,
+ FileSource.APPEND);
}
private Pair<InternalRow, Integer> row(int pt, int col, int pk, int
bucket) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index e891406c1..810cef860 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.FullSimpleColStatsCollector;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.stats.SimpleStatsConverter;
@@ -162,7 +163,8 @@ public class DataFileTestDataGenerator {
0,
level,
kvs.stream().filter(kv ->
kv.valueKind().isRetract()).count(),
- null),
+ null,
+ FileSource.APPEND),
kvs);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index 96ba26208..f31b21f1e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.StatsTestUtils;
import java.util.Collections;
@@ -54,7 +55,8 @@ public class DataFileTestUtils {
Collections.emptyList(),
Timestamp.fromEpochMillis(100),
maxSeq - minSeq + 1,
- null);
+ null,
+ FileSource.APPEND);
}
public static DataFileMeta newFile() {
@@ -71,7 +73,8 @@ public class DataFileTestUtils {
0,
0,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
public static DataFileMeta newFile(
@@ -94,7 +97,8 @@ public class DataFileTestUtils {
0,
level,
deleteRowCount,
- null);
+ null,
+ FileSource.APPEND);
}
public static BinaryRow row(int i) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e56e0deb9..9f1405680 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.stats.StatsTestUtils;
@@ -101,7 +102,7 @@ public class KeyValueFileReadWriteTest {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingMergeTreeFileWriter(0);
+ writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();
@@ -130,7 +131,8 @@ public class KeyValueFileReadWriteTest {
FailingFileIO.getFailingPath(failingName,
tempDir.toString()), "avro");
try {
- FileWriter<KeyValue, ?> writer =
writerFactory.createRollingMergeTreeFileWriter(0);
+ FileWriter<KeyValue, ?> writer =
+ writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
} catch (Throwable e) {
if (e.getCause() != null) {
@@ -154,7 +156,7 @@ public class KeyValueFileReadWriteTest {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingMergeTreeFileWriter(0);
+ writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();
@@ -192,7 +194,7 @@ public class KeyValueFileReadWriteTest {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingMergeTreeFileWriter(0);
+ writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();
@@ -397,7 +399,7 @@ public class KeyValueFileReadWriteTest {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingMergeTreeFileWriter(0);
+ writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 04712ffd8..2ac2a8e4d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.DataType;
@@ -88,7 +89,8 @@ public class RollingFileWriterTest {
StatsCollectorFactories.createStatsFactories(
new CoreOptions(new
HashMap<>()),
SCHEMA.getFieldNames()),
- new FileIndexOptions()),
+ new FileIndexOptions(),
+ FileSource.APPEND),
TARGET_FILE_SIZE);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index d9c24eddb..099c38003 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -116,6 +116,7 @@ public class ManifestCommittableSerializerTest {
0,
level,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index a86d6f9ed..e74af8b30 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -83,8 +83,8 @@ public abstract class ManifestFileMetaTestBase {
Collections.emptyList(),
Timestamp.fromEpochMillis(200000),
0L, // not used
- null // not used
- ));
+ null, // not used
+ FileSource.APPEND));
}
protected ManifestFileMeta makeManifest(ManifestEntry... entries) {
@@ -245,6 +245,7 @@ public abstract class ManifestFileMetaTestBase {
0, // not used
0, // not used
0L,
- null));
+ null,
+ FileSource.APPEND));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 9ec0e38e8..0bd60c673 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -206,7 +207,7 @@ public class ContainsLevelsTest {
private DataFileMeta newFile(int level, KeyValue... records) throws
IOException {
RollingFileWriter<KeyValue, DataFileMeta> writer =
- createWriterFactory().createRollingMergeTreeFileWriter(level);
+ createWriterFactory().createRollingMergeTreeFileWriter(level,
FileSource.APPEND);
for (KeyValue kv : records) {
writer.write(kv);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index 4763585b0..a86aa445b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.junit.jupiter.api.Test;
@@ -81,6 +82,7 @@ public class LevelsTest {
0,
level,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index d38b516fc..b40c53d12 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -286,7 +287,7 @@ public class LookupLevelsTest {
private DataFileMeta newFile(int level, KeyValue... records) throws
IOException {
RollingFileWriter<KeyValue, DataFileMeta> writer =
- createWriterFactory().createRollingMergeTreeFileWriter(level);
+ createWriterFactory().createRollingMergeTreeFileWriter(level,
FileSource.APPEND);
for (KeyValue kv : records) {
writer.write(kv);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 3f9bd3e6e..33b81a373 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -36,6 +36,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.compact.AbstractCompactRewriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
@@ -608,7 +609,7 @@ public abstract class MergeTreeTestBase {
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections)
throws Exception {
RollingFileWriter<KeyValue, DataFileMeta> writer =
-
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+
writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);
RecordReader<KeyValue> reader =
MergeTreeReaders.readerForMergeTree(
sections,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 684eb7fc3..4d4117c7d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.stats.StatsTestUtils;
@@ -182,7 +183,8 @@ public class IntervalPartitionTest {
Collections.emptyList(),
Timestamp.fromEpochMillis(100000),
0L,
- null);
+ null,
+ FileSource.APPEND);
}
private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>>
sections) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 0d891f2c7..25d263a93 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;
@@ -357,6 +358,7 @@ public class UniversalCompactionTest {
}
static DataFileMeta file(long size) {
- return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0, 0L, null);
+ return new DataFileMeta(
+ "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null,
FileSource.APPEND);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 626c13c01..8d56a5169 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.schema.Schema;
@@ -205,7 +206,8 @@ public class ExpireSnapshotsTest {
extraFiles,
Timestamp.now(),
0L,
- null);
+ null,
+ FileSource.APPEND);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index a3685d5f6..3594a8a78 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;
@@ -56,7 +57,8 @@ public class SplitGeneratorTest {
0,
0,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index b46015fbc..56813a7da 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.junit.jupiter.api.Test;
@@ -76,6 +77,7 @@ public class CompactionTaskSimpleSerializerTest {
0,
0,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 820086f92..ad30f6388 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
@@ -113,8 +114,8 @@ public class FileStoreSourceSplitGeneratorTest {
0, // not used
0, // not used
0L, // not used
- null // not used
- ));
+ null, // not used
+ FileSource.APPEND));
}
return DataSplit.builder()
.withSnapshot(1)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 795373e96..f2c0732a2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.table.source.DataSplit;
@@ -86,7 +87,8 @@ public class FileStoreSourceSplitSerializerTest {
0,
level,
0L,
- null);
+ null,
+ FileSource.APPEND);
}
public static FileStoreSourceSplit newSourceSplit(
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index 7b150c1fc..7fae33953 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -21,7 +21,8 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.io.DataFileMeta
-import org.apache.paimon.table.source.{DataSplit, RawFile, Split}
+import org.apache.paimon.manifest.FileSource
+import org.apache.paimon.table.source.{DataSplit, Split}
import org.junit.jupiter.api.Assertions
@@ -41,7 +42,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
0.until(fileNum).foreach {
i =>
val path = s"f$i.parquet"
- files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999,
1)
+ files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999,
1, FileSource.APPEND)
}
val dataSplits = mutable.ArrayBuffer.empty[Split]