This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c38b03cc [core] Introduce FileSource to DataFileMeta (#3363)
2c38b03cc is described below

commit 2c38b03cc191168273209163fc2c1585b6713afc
Author: Aitozi <[email protected]>
AuthorDate: Wed May 22 17:57:42 2024 +0800

    [core] Introduce FileSource to DataFileMeta (#3363)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java |  4 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    | 49 +++++++++++++++------
 .../apache/paimon/io/DataFileMetaSerializer.java   |  7 ++-
 .../apache/paimon/io/KeyValueDataFileWriter.java   |  9 +++-
 .../paimon/io/KeyValueFileWriterFactory.java       | 18 +++++---
 .../org/apache/paimon/io/RowDataFileWriter.java    |  9 +++-
 .../apache/paimon/io/RowDataRollingFileWriter.java |  7 ++-
 .../org/apache/paimon/manifest/FileSource.java     | 51 ++++++++++++++++++++++
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  3 +-
 .../compact/ChangelogMergeTreeRewriter.java        |  5 ++-
 .../compact/MergeTreeCompactRewriter.java          |  3 +-
 .../org/apache/paimon/migrate/FileMetaUtils.java   |  4 +-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  4 +-
 .../AppendOnlyTableCompactionCoordinatorTest.java  |  4 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |  4 +-
 .../paimon/crosspartition/IndexBootstrapTest.java  |  4 +-
 .../paimon/io/DataFileTestDataGenerator.java       |  4 +-
 .../org/apache/paimon/io/DataFileTestUtils.java    | 10 +++--
 .../paimon/io/KeyValueFileReadWriteTest.java       | 12 ++---
 .../apache/paimon/io/RollingFileWriterTest.java    |  4 +-
 .../ManifestCommittableSerializerTest.java         |  3 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  7 +--
 .../paimon/mergetree/ContainsLevelsTest.java       |  3 +-
 .../org/apache/paimon/mergetree/LevelsTest.java    |  4 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  3 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  3 +-
 .../mergetree/compact/IntervalPartitionTest.java   |  4 +-
 .../mergetree/compact/UniversalCompactionTest.java |  4 +-
 .../paimon/operation/ExpireSnapshotsTest.java      |  4 +-
 .../paimon/table/source/SplitGeneratorTest.java    |  4 +-
 .../sink/CompactionTaskSimpleSerializerTest.java   |  4 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |  5 ++-
 .../source/FileStoreSourceSplitSerializerTest.java |  4 +-
 .../org/apache/paimon/spark/ScanHelperTest.scala   |  5 ++-
 34 files changed, 209 insertions(+), 63 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index e26d302da..c5a47ff54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -256,7 +257,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                 seqNumCounter,
                 fileCompression,
                 statsCollectors,
-                fileIndexOptions);
+                fileIndexOptions,
+                FileSource.APPEND);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index d62bb2d15..bfdfaf249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
 import org.apache.paimon.types.ArrayType;
@@ -30,6 +31,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
 
 import javax.annotation.Nullable;
 
@@ -83,6 +85,8 @@ public class DataFileMeta {
     // file index filter bytes, if it is small, store in data file meta
     private final @Nullable byte[] embeddedIndex;
 
+    private final @Nullable FileSource fileSource;
+
     public static DataFileMeta forAppend(
             String fileName,
             long fileSize,
@@ -90,7 +94,8 @@ public class DataFileMeta {
             SimpleStats rowStats,
             long minSequenceNumber,
             long maxSequenceNumber,
-            long schemaId) {
+            long schemaId,
+            @Nullable FileSource fileSource) {
         return forAppend(
                 fileName,
                 fileSize,
@@ -100,7 +105,8 @@ public class DataFileMeta {
                 maxSequenceNumber,
                 schemaId,
                 Collections.emptyList(),
-                null);
+                null,
+                fileSource);
     }
 
     public static DataFileMeta forAppend(
@@ -112,7 +118,8 @@ public class DataFileMeta {
             long maxSequenceNumber,
             long schemaId,
             List<String> extraFiles,
-            @Nullable byte[] embeddedIndex) {
+            @Nullable byte[] embeddedIndex,
+            @Nullable FileSource fileSource) {
         return new DataFileMeta(
                 fileName,
                 fileSize,
@@ -128,7 +135,8 @@ public class DataFileMeta {
                 extraFiles,
                 
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
                 0L,
-                embeddedIndex);
+                embeddedIndex,
+                fileSource);
     }
 
     public DataFileMeta(
@@ -144,7 +152,8 @@ public class DataFileMeta {
             long schemaId,
             int level,
             @Nullable Long deleteRowCount,
-            @Nullable byte[] embeddedIndex) {
+            @Nullable byte[] embeddedIndex,
+            @Nullable FileSource fileSource) {
         this(
                 fileName,
                 fileSize,
@@ -160,7 +169,8 @@ public class DataFileMeta {
                 Collections.emptyList(),
                 
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
                 deleteRowCount,
-                embeddedIndex);
+                embeddedIndex,
+                fileSource);
     }
 
     public DataFileMeta(
@@ -178,7 +188,8 @@ public class DataFileMeta {
             List<String> extraFiles,
             Timestamp creationTime,
             @Nullable Long deleteRowCount,
-            @Nullable byte[] embeddedIndex) {
+            @Nullable byte[] embeddedIndex,
+            @Nullable FileSource fileSource) {
         this.fileName = fileName;
         this.fileSize = fileSize;
 
@@ -198,6 +209,7 @@ public class DataFileMeta {
         this.creationTime = creationTime;
 
         this.deleteRowCount = deleteRowCount;
+        this.fileSource = fileSource;
     }
 
     public String fileName() {
@@ -291,6 +303,10 @@ public class DataFileMeta {
         return split[split.length - 1];
     }
 
+    public Optional<FileSource> fileSource() {
+        return Optional.ofNullable(fileSource);
+    }
+
     public DataFileMeta upgrade(int newLevel) {
         checkArgument(newLevel > this.level);
         return new DataFileMeta(
@@ -308,7 +324,8 @@ public class DataFileMeta {
                 extraFiles,
                 creationTime,
                 deleteRowCount,
-                embeddedIndex);
+                embeddedIndex,
+                fileSource);
     }
 
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -334,7 +351,8 @@ public class DataFileMeta {
                 newExtraFiles,
                 creationTime,
                 deleteRowCount,
-                embeddedIndex);
+                embeddedIndex,
+                fileSource);
     }
 
     @Override
@@ -360,7 +378,8 @@ public class DataFileMeta {
                 && level == that.level
                 && Objects.equals(extraFiles, that.extraFiles)
                 && Objects.equals(creationTime, that.creationTime)
-                && Objects.equals(deleteRowCount, that.deleteRowCount);
+                && Objects.equals(deleteRowCount, that.deleteRowCount)
+                && Objects.equals(fileSource, that.fileSource);
     }
 
     @Override
@@ -380,7 +399,8 @@ public class DataFileMeta {
                 level,
                 extraFiles,
                 creationTime,
-                deleteRowCount);
+                deleteRowCount,
+                fileSource);
     }
 
     @Override
@@ -389,7 +409,8 @@ public class DataFileMeta {
                 "{fileName: %s, fileSize: %d, rowCount: %d, embeddedIndex: %s, 
"
                         + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: 
%s, "
                         + "minSequenceNumber: %d, maxSequenceNumber: %d, "
-                        + "schemaId: %d, level: %d, extraFiles: %s, 
creationTime: %s, deleteRowCount: %d}",
+                        + "schemaId: %d, level: %d, extraFiles: %s, 
creationTime: %s, "
+                        + "deleteRowCount: %d, fileSource: %s}",
                 fileName,
                 fileSize,
                 rowCount,
@@ -404,7 +425,8 @@ public class DataFileMeta {
                 level,
                 extraFiles,
                 creationTime,
-                deleteRowCount);
+                deleteRowCount,
+                fileSource);
     }
 
     public static RowType schema() {
@@ -424,6 +446,7 @@ public class DataFileMeta {
         fields.add(new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()));
         fields.add(new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)));
         fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", 
newBytesType(true)));
+        fields.add(new DataField(15, "_FILE_SOURCE", new TinyIntType(true)));
         return new RowType(fields);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index 68707a6c4..78891d064 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.utils.ObjectSerializer;
 
@@ -55,7 +56,8 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 toStringArrayData(meta.extraFiles()),
                 meta.creationTime(),
                 meta.deleteRowCount().orElse(null),
-                meta.embeddedIndex());
+                meta.embeddedIndex(),
+                meta.fileSource().map(FileSource::toByteValue).orElse(null));
     }
 
     @Override
@@ -75,6 +77,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 fromStringArrayData(row.getArray(11)),
                 row.getTimestamp(12, 3),
                 row.isNullAt(13) ? null : row.getLong(13),
-                row.isNullAt(14) ? null : row.getBinary(14));
+                row.isNullAt(14) ? null : row.getBinary(14),
+                row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index bfea680fa..46db1ddd1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
 import org.apache.paimon.types.RowType;
@@ -62,6 +63,7 @@ public class KeyValueDataFileWriter
     private final SimpleStatsConverter keyStatsConverter;
     private final SimpleStatsConverter valueStatsConverter;
     private final InternalRowSerializer keySerializer;
+    private final FileSource fileSource;
 
     private BinaryRow minKey = null;
     private InternalRow maxKey = null;
@@ -80,7 +82,8 @@ public class KeyValueDataFileWriter
             long schemaId,
             int level,
             String compression,
-            CoreOptions options) {
+            CoreOptions options,
+            FileSource fileSource) {
         super(
                 fileIO,
                 factory,
@@ -100,6 +103,7 @@ public class KeyValueDataFileWriter
         this.keyStatsConverter = new SimpleStatsConverter(keyType);
         this.valueStatsConverter = new SimpleStatsConverter(valueType);
         this.keySerializer = new InternalRowSerializer(keyType);
+        this.fileSource = fileSource;
     }
 
     @Override
@@ -170,6 +174,7 @@ public class KeyValueDataFileWriter
                 level,
                 deleteRecordCount,
                 // TODO: enable file filter for primary key table (e.g. 
deletion table).
-                null);
+                null,
+                fileSource);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 0d3916148..72f9b3f65 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -81,9 +82,12 @@ public class KeyValueFileWriterFactory {
         return formatContext.pathFactory(level);
     }
 
-    public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingMergeTreeFileWriter(int level) {
+    public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingMergeTreeFileWriter(
+            int level, FileSource fileSource) {
         return new RollingFileWriter<>(
-                () -> 
createDataFileWriter(formatContext.pathFactory(level).newPath(), level),
+                () ->
+                        createDataFileWriter(
+                                formatContext.pathFactory(level).newPath(), 
level, fileSource),
                 suggestedFileSize);
     }
 
@@ -91,11 +95,14 @@ public class KeyValueFileWriterFactory {
         return new RollingFileWriter<>(
                 () ->
                         createDataFileWriter(
-                                
formatContext.pathFactory(level).newChangelogPath(), level),
+                                
formatContext.pathFactory(level).newChangelogPath(),
+                                level,
+                                FileSource.APPEND),
                 suggestedFileSize);
     }
 
-    private KeyValueDataFileWriter createDataFileWriter(Path path, int level) {
+    private KeyValueDataFileWriter createDataFileWriter(
+            Path path, int level, FileSource fileSource) {
         KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
         return new KeyValueDataFileWriter(
                 fileIO,
@@ -108,7 +115,8 @@ public class KeyValueFileWriterFactory {
                 schemaId,
                 level,
                 formatContext.compression(level),
-                options);
+                options,
+                fileSource);
     }
 
     public void deleteFile(String filename, int level) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index e0521ca79..3da909817 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
@@ -48,6 +49,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
     private final LongCounter seqNumCounter;
     private final SimpleStatsConverter statsArraySerializer;
     @Nullable private final FileIndexWriter fileIndexWriter;
+    private final FileSource fileSource;
 
     public RowDataFileWriter(
             FileIO fileIO,
@@ -59,7 +61,8 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             LongCounter seqNumCounter,
             String fileCompression,
             SimpleColStatsCollector.Factory[] statsCollectors,
-            FileIndexOptions fileIndexOptions) {
+            FileIndexOptions fileIndexOptions,
+            FileSource fileSource) {
         super(
                 fileIO,
                 factory,
@@ -75,6 +78,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
         this.fileIndexWriter =
                 FileIndexWriter.create(
                         fileIO, toFileIndexPath(path), writeSchema, 
fileIndexOptions);
+        this.fileSource = fileSource;
     }
 
     @Override
@@ -111,6 +115,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                 indexResult.independentIndexFile() == null
                         ? Collections.emptyList()
                         : 
Collections.singletonList(indexResult.independentIndexFile()),
-                indexResult.embeddedIndexBytes());
+                indexResult.embeddedIndexBytes(),
+                fileSource);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index d6f457795..9827c8f7c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.avro.AvroFileFormat;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
@@ -40,7 +41,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
             LongCounter seqNumCounter,
             String fileCompression,
             SimpleColStatsCollector.Factory[] statsCollectors,
-            FileIndexOptions fileIndexOptions) {
+            FileIndexOptions fileIndexOptions,
+            FileSource fileSource) {
         super(
                 () ->
                         new RowDataFileWriter(
@@ -57,7 +59,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                                 seqNumCounter,
                                 fileCompression,
                                 statsCollectors,
-                                fileIndexOptions),
+                                fileIndexOptions,
+                                fileSource),
                 targetFileSize);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java
new file mode 100644
index 000000000..c62aea2d6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+/** The Source of a file. */
+public enum FileSource {
+
+    /** The file from new input. */
+    APPEND((byte) 0),
+
+    /** The file from compaction. */
+    COMPACT((byte) 1);
+
+    private final byte value;
+
+    FileSource(byte value) {
+        this.value = value;
+    }
+
+    public byte toByteValue() {
+        return value;
+    }
+
+    public static FileSource fromByteValue(byte value) {
+        switch (value) {
+            case 0:
+                return APPEND;
+            case 1:
+                return COMPACT;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported byte value '" + value + "' for value 
kind.");
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 6becbf731..a42952914 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.mergetree.compact.MergeFunction;
@@ -212,7 +213,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
                             ? writerFactory.createRollingChangelogFileWriter(0)
                             : null;
             final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
-                    writerFactory.createRollingMergeTreeFileWriter(0);
+                    writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
 
             try {
                 writeBuffer.forEach(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index a1aabc4e7..a03d53329 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.utils.CloseableIterator;
@@ -128,7 +129,9 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                     readerForMergeTree(sections, 
createMergeWrapper(outputLevel))
                             .toCloseableIterator();
             if (rewriteCompactFile) {
-                compactFileWriter = 
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+                compactFileWriter =
+                        writerFactory.createRollingMergeTreeFileWriter(
+                                outputLevel, FileSource.COMPACT);
             }
             if (produceChangelog) {
                 changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(outputLevel);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index d399d599d..89e82fe3f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.mergetree.DropDeleteReader;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
@@ -73,7 +74,7 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     protected CompactResult rewriteCompaction(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) throws Exception {
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+                writerFactory.createRollingMergeTreeFileWriter(outputLevel, 
FileSource.COMPACT);
         RecordReader<KeyValue> reader =
                 readerForMergeTree(sections, new 
ReducerMergeFunctionWrapper(mfFactory.create()));
         if (dropDelete) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 1ffc0cb5d..7c84c87d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
@@ -164,7 +165,8 @@ public class FileMetaUtils {
                 stats,
                 0,
                 0,
-                ((FileStoreTable) table).schema().id());
+                ((FileStoreTable) table).schema().id(),
+                FileSource.APPEND);
     }
 
     public static BinaryRow writePartitionValue(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 560648d41..64cbb5fd7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -33,6 +33,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -177,7 +178,8 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                             new 
LongCounter(toCompact.get(0).minSequenceNumber()),
                             fileCompression,
                             statsCollectors,
-                            fileIndexOptions);
+                            fileIndexOptions,
+                            FileSource.COMPACT);
             try {
                 rewriter.write(bucketReader(partition, 
bucket).read(toCompact));
             } finally {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index ec4212f16..bd0c7da29 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -187,6 +188,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
                 0,
                 0,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 6d0c59754..b5cc55f9c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.MemorySize;
@@ -646,6 +647,7 @@ public class AppendOnlyWriterTest {
                         }),
                 minSeq,
                 maxSeq,
-                toCompact.get(0).schemaId());
+                toCompact.get(0).schemaId(),
+                FileSource.APPEND);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 19541f607..bbb1abfd3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
@@ -141,7 +142,8 @@ public class IndexBootstrapTest extends TableTestBase {
                                 .atZone(ZoneId.systemDefault())
                                 .toLocalDateTime()),
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     private Pair<InternalRow, Integer> row(int pt, int col, int pk, int 
bucket) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index e891406c1..810cef860 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.FullSimpleColStatsCollector;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.stats.SimpleStatsConverter;
@@ -162,7 +163,8 @@ public class DataFileTestDataGenerator {
                         0,
                         level,
                         kvs.stream().filter(kv -> 
kv.valueKind().isRetract()).count(),
-                        null),
+                        null,
+                        FileSource.APPEND),
                 kvs);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index 96ba26208..f31b21f1e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.StatsTestUtils;
 
 import java.util.Collections;
@@ -54,7 +55,8 @@ public class DataFileTestUtils {
                 Collections.emptyList(),
                 Timestamp.fromEpochMillis(100),
                 maxSeq - minSeq + 1,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     public static DataFileMeta newFile() {
@@ -71,7 +73,8 @@ public class DataFileTestUtils {
                 0,
                 0,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     public static DataFileMeta newFile(
@@ -94,7 +97,8 @@ public class DataFileTestUtils {
                 0,
                 level,
                 deleteRowCount,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     public static BinaryRow row(int i) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e56e0deb9..9f1405680 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.stats.StatsTestUtils;
@@ -101,7 +102,7 @@ public class KeyValueFileReadWriteTest {
         DataFileMetaSerializer serializer = new DataFileMetaSerializer();
 
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingMergeTreeFileWriter(0);
+                writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
         writer.write(CloseableIterator.fromList(data.content, kv -> {}));
         writer.close();
         List<DataFileMeta> actualMetas = writer.result();
@@ -130,7 +131,8 @@ public class KeyValueFileReadWriteTest {
                         FailingFileIO.getFailingPath(failingName, 
tempDir.toString()), "avro");
 
         try {
-            FileWriter<KeyValue, ?> writer = 
writerFactory.createRollingMergeTreeFileWriter(0);
+            FileWriter<KeyValue, ?> writer =
+                    writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
             writer.write(CloseableIterator.fromList(data.content, kv -> {}));
         } catch (Throwable e) {
             if (e.getCause() != null) {
@@ -154,7 +156,7 @@ public class KeyValueFileReadWriteTest {
         DataFileMetaSerializer serializer = new DataFileMetaSerializer();
 
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingMergeTreeFileWriter(0);
+                writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
         writer.write(CloseableIterator.fromList(data.content, kv -> {}));
         writer.close();
         List<DataFileMeta> actualMetas = writer.result();
@@ -192,7 +194,7 @@ public class KeyValueFileReadWriteTest {
         DataFileMetaSerializer serializer = new DataFileMetaSerializer();
 
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingMergeTreeFileWriter(0);
+                writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
         writer.write(CloseableIterator.fromList(data.content, kv -> {}));
         writer.close();
         List<DataFileMeta> actualMetas = writer.result();
@@ -397,7 +399,7 @@ public class KeyValueFileReadWriteTest {
         DataFileMetaSerializer serializer = new DataFileMetaSerializer();
 
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingMergeTreeFileWriter(0);
+                writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
         writer.write(CloseableIterator.fromList(data.content, kv -> {}));
         writer.close();
         List<DataFileMeta> actualMetas = writer.result();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 04712ffd8..2ac2a8e4d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.DataType;
@@ -88,7 +89,8 @@ public class RollingFileWriterTest {
                                         
StatsCollectorFactories.createStatsFactories(
                                                 new CoreOptions(new 
HashMap<>()),
                                                 SCHEMA.getFieldNames()),
-                                        new FileIndexOptions()),
+                                        new FileIndexOptions(),
+                                        FileSource.APPEND),
                         TARGET_FILE_SIZE);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index d9c24eddb..099c38003 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -116,6 +116,7 @@ public class ManifestCommittableSerializerTest {
                 0,
                 level,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index a86d6f9ed..e74af8b30 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -83,8 +83,8 @@ public abstract class ManifestFileMetaTestBase {
                         Collections.emptyList(),
                         Timestamp.fromEpochMillis(200000),
                         0L, // not used
-                        null // not used
-                        ));
+                        null, // not used
+                        FileSource.APPEND));
     }
 
     protected ManifestFileMeta makeManifest(ManifestEntry... entries) {
@@ -245,6 +245,7 @@ public abstract class ManifestFileMetaTestBase {
                         0, // not used
                         0, // not used
                         0L,
-                        null));
+                        null,
+                        FileSource.APPEND));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 9ec0e38e8..0bd60c673 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -206,7 +207,7 @@ public class ContainsLevelsTest {
 
     private DataFileMeta newFile(int level, KeyValue... records) throws 
IOException {
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                createWriterFactory().createRollingMergeTreeFileWriter(level);
+                createWriterFactory().createRollingMergeTreeFileWriter(level, 
FileSource.APPEND);
         for (KeyValue kv : records) {
             writer.write(kv);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index 4763585b0..a86aa445b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 
 import org.junit.jupiter.api.Test;
 
@@ -81,6 +82,7 @@ public class LevelsTest {
                 0,
                 level,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index d38b516fc..b40c53d12 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -286,7 +287,7 @@ public class LookupLevelsTest {
 
     private DataFileMeta newFile(int level, KeyValue... records) throws 
IOException {
         RollingFileWriter<KeyValue, DataFileMeta> writer =
-                createWriterFactory().createRollingMergeTreeFileWriter(level);
+                createWriterFactory().createRollingMergeTreeFileWriter(level, 
FileSource.APPEND);
         for (KeyValue kv : records) {
             writer.write(kv);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 3f9bd3e6e..33b81a373 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -36,6 +36,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.compact.AbstractCompactRewriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
@@ -608,7 +609,7 @@ public abstract class MergeTreeTestBase {
                 int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections)
                 throws Exception {
             RollingFileWriter<KeyValue, DataFileMeta> writer =
-                    
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+                    
writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);
             RecordReader<KeyValue> reader =
                     MergeTreeReaders.readerForMergeTree(
                             sections,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 684eb7fc3..4d4117c7d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.stats.StatsTestUtils;
 
@@ -182,7 +183,8 @@ public class IntervalPartitionTest {
                 Collections.emptyList(),
                 Timestamp.fromEpochMillis(100000),
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>> 
sections) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 0d891f2c7..25d263a93 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.mergetree.SortedRun;
 
@@ -357,6 +358,7 @@ public class UniversalCompactionTest {
     }
 
     static DataFileMeta file(long size) {
-        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0, 0L, null);
+        return new DataFileMeta(
+                "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null, 
FileSource.APPEND);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 626c13c01..8d56a5169 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.schema.Schema;
@@ -205,7 +206,8 @@ public class ExpireSnapshotsTest {
                         extraFiles,
                         Timestamp.now(),
                         0L,
-                        null);
+                        null,
+                        FileSource.APPEND);
         ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, 
dataFile);
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index a3685d5f6..3594a8a78 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.Pair;
 
@@ -56,7 +57,8 @@ public class SplitGeneratorTest {
                 0,
                 0,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index b46015fbc..56813a7da 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.append.AppendOnlyCompactionTask;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.table.sink.CompactionTaskSerializer;
 
 import org.junit.jupiter.api.Test;
@@ -76,6 +77,7 @@ public class CompactionTaskSimpleSerializerTest {
                 0,
                 0,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 820086f92..ad30f6388 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
@@ -113,8 +114,8 @@ public class FileStoreSourceSplitGeneratorTest {
                             0, // not used
                             0, // not used
                             0L, // not used
-                            null // not used
-                            ));
+                            null, // not used
+                            FileSource.APPEND));
         }
         return DataSplit.builder()
                 .withSnapshot(1)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 795373e96..f2c0732a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.source.DataSplit;
 
@@ -86,7 +87,8 @@ public class FileStoreSourceSplitSerializerTest {
                 0,
                 level,
                 0L,
-                null);
+                null,
+                FileSource.APPEND);
     }
 
     public static FileStoreSourceSplit newSourceSplit(
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index 7b150c1fc..7fae33953 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -21,7 +21,8 @@ package org.apache.paimon.spark
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.io.DataFileMeta
-import org.apache.paimon.table.source.{DataSplit, RawFile, Split}
+import org.apache.paimon.manifest.FileSource
+import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.junit.jupiter.api.Assertions
 
@@ -41,7 +42,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
       0.until(fileNum).foreach {
         i =>
           val path = s"f$i.parquet"
-          files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 
1)
+          files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 
1, FileSource.APPEND)
       }
 
       val dataSplits = mutable.ArrayBuffer.empty[Split]

Reply via email to