This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ff5192a09 [core] Add _EXTERNAL_PATH in DataFileMeta
9ff5192a09 is described below

commit 9ff5192a09856aff318597e589f8e8c8f6ff325b
Author: HouliangQi <[email protected]>
AuthorDate: Thu Dec 19 13:09:18 2024 +0800

    [core] Add _EXTERNAL_PATH in DataFileMeta
    
    This closes #4751
---
 .../java/org/apache/paimon/io/DataFileMeta.java    |  50 +++++++++----
 .../apache/paimon/io/DataFileMeta08Serializer.java |   1 +
 .../apache/paimon/io/DataFileMeta09Serializer.java |   1 +
 ...er.java => DataFileMeta10LegacySerializer.java} |  14 ++--
 .../apache/paimon/io/DataFileMetaSerializer.java   |   6 +-
 .../sink/CommitMessageLegacyV2Serializer.java      |   1 +
 .../paimon/table/sink/CommitMessageSerializer.java |  11 ++-
 .../org/apache/paimon/table/source/DataSplit.java  |   8 ++-
 .../paimon/crosspartition/IndexBootstrapTest.java  |   1 +
 .../org/apache/paimon/io/DataFileTestUtils.java    |   1 +
 ...festCommittableSerializerCompatibilityTest.java |  79 ++++++++++++++++++++-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   1 +
 .../mergetree/compact/IntervalPartitionTest.java   |   1 +
 .../paimon/operation/ExpireSnapshotsTest.java      |   1 +
 .../org/apache/paimon/table/source/SplitTest.java  |  73 ++++++++++++++++++-
 .../src/test/resources/compatibility/datasplit-v4  | Bin 0 -> 934 bytes
 .../compatibility/manifest-committable-v5          | Bin 0 -> 3449 bytes
 17 files changed, 221 insertions(+), 28 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index bb9e45ff00..3be09ea6c2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -82,7 +82,8 @@ public class DataFileMeta {
                             new DataField(
                                     16,
                                     "_VALUE_STATS_COLS",
-                                    
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
+                                    
DataTypes.ARRAY(DataTypes.STRING().notNull())),
+                            new DataField(17, "_EXTERNAL_PATH", 
newStringType(true))));
 
     public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
     public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
@@ -120,6 +121,9 @@ public class DataFileMeta {
 
     private final @Nullable List<String> valueStatsCols;
 
+    /** external path of file, if it is null, it is in the default warehouse 
path. */
+    private final @Nullable String externalPath;
+
     public static DataFileMeta forAppend(
             String fileName,
             long fileSize,
@@ -149,7 +153,8 @@ public class DataFileMeta {
                 0L,
                 embeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                null);
     }
 
     public DataFileMeta(
@@ -186,7 +191,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 embeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                null);
     }
 
     public DataFileMeta(
@@ -222,7 +228,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 embeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                null);
     }
 
     public DataFileMeta(
@@ -242,7 +249,8 @@ public class DataFileMeta {
             @Nullable Long deleteRowCount,
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
-            @Nullable List<String> valueStatsCols) {
+            @Nullable List<String> valueStatsCols,
+            @Nullable String externalPath) {
         this.fileName = fileName;
         this.fileSize = fileSize;
 
@@ -264,6 +272,7 @@ public class DataFileMeta {
         this.deleteRowCount = deleteRowCount;
         this.fileSource = fileSource;
         this.valueStatsCols = valueStatsCols;
+        this.externalPath = externalPath;
     }
 
     public String fileName() {
@@ -357,6 +366,11 @@ public class DataFileMeta {
         return split[split.length - 1];
     }
 
+    @Nullable
+    public String externalPath() {
+        return externalPath;
+    }
+
     public Optional<FileSource> fileSource() {
         return Optional.ofNullable(fileSource);
     }
@@ -385,7 +399,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 embeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                externalPath);
     }
 
     public DataFileMeta rename(String newFileName) {
@@ -406,7 +421,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 embeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                externalPath);
     }
 
     public DataFileMeta copyWithoutStats() {
@@ -427,7 +443,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 embeddedIndex,
                 fileSource,
-                Collections.emptyList());
+                Collections.emptyList(),
+                externalPath);
     }
 
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -455,7 +472,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 embeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                externalPath);
     }
 
     public DataFileMeta copy(byte[] newEmbeddedIndex) {
@@ -476,7 +494,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 newEmbeddedIndex,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                externalPath);
     }
 
     @Override
@@ -504,7 +523,8 @@ public class DataFileMeta {
                 && Objects.equals(creationTime, that.creationTime)
                 && Objects.equals(deleteRowCount, that.deleteRowCount)
                 && Objects.equals(fileSource, that.fileSource)
-                && Objects.equals(valueStatsCols, that.valueStatsCols);
+                && Objects.equals(valueStatsCols, that.valueStatsCols)
+                && Objects.equals(externalPath, that.externalPath);
     }
 
     @Override
@@ -526,7 +546,8 @@ public class DataFileMeta {
                 creationTime,
                 deleteRowCount,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                externalPath);
     }
 
     @Override
@@ -536,7 +557,7 @@ public class DataFileMeta {
                         + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: 
%s, "
                         + "minSequenceNumber: %d, maxSequenceNumber: %d, "
                         + "schemaId: %d, level: %d, extraFiles: %s, 
creationTime: %s, "
-                        + "deleteRowCount: %d, fileSource: %s, valueStatsCols: 
%s}",
+                        + "deleteRowCount: %d, fileSource: %s, valueStatsCols: 
%s, externalPath: %s}",
                 fileName,
                 fileSize,
                 rowCount,
@@ -553,7 +574,8 @@ public class DataFileMeta {
                 creationTime,
                 deleteRowCount,
                 fileSource,
-                valueStatsCols);
+                valueStatsCols,
+                externalPath);
     }
 
     public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index 03e4ed51f4..e6c10f1534 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -133,6 +133,7 @@ public class DataFileMeta08Serializer implements 
Serializable {
                 row.isNullAt(13) ? null : row.getLong(13),
                 row.isNullAt(14) ? null : row.getBinary(14),
                 null,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
index 2f8d89f5b1..36d1ad260f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
@@ -139,6 +139,7 @@ public class DataFileMeta09Serializer implements 
Serializable {
                 row.isNullAt(13) ? null : row.getLong(13),
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
similarity index 92%
copy from 
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
copy to 
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 2f8d89f5b1..68ccba6ea3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -47,7 +47,7 @@ import static 
org.apache.paimon.utils.SerializationUtils.newStringType;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
 /** Serializer for {@link DataFileMeta} with 0.9 version. */
-public class DataFileMeta09Serializer implements Serializable {
+public class DataFileMeta10LegacySerializer implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -71,11 +71,15 @@ public class DataFileMeta09Serializer implements 
Serializable {
                             new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()),
                             new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)),
                             new DataField(14, "_EMBEDDED_FILE_INDEX", 
newBytesType(true)),
-                            new DataField(15, "_FILE_SOURCE", new 
TinyIntType(true))));
+                            new DataField(15, "_FILE_SOURCE", new 
TinyIntType(true)),
+                            new DataField(
+                                    16,
+                                    "_VALUE_STATS_COLS",
+                                    
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
 
     protected final InternalRowSerializer rowSerializer;
 
-    public DataFileMeta09Serializer() {
+    public DataFileMeta10LegacySerializer() {
         this.rowSerializer = InternalSerializers.create(SCHEMA);
     }
 
@@ -105,7 +109,8 @@ public class DataFileMeta09Serializer implements 
Serializable {
                         meta.creationTime(),
                         meta.deleteRowCount().orElse(null),
                         meta.embeddedIndex(),
-                        
meta.fileSource().map(FileSource::toByteValue).orElse(null));
+                        
meta.fileSource().map(FileSource::toByteValue).orElse(null),
+                        toStringArrayData(meta.valueStatsCols()));
         rowSerializer.serialize(row, target);
     }
 
@@ -139,6 +144,7 @@ public class DataFileMeta09Serializer implements 
Serializable {
                 row.isNullAt(13) ? null : row.getLong(13),
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
+                row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)),
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index 626201ca30..c8a5e326b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -58,7 +58,8 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 meta.deleteRowCount().orElse(null),
                 meta.embeddedIndex(),
                 meta.fileSource().map(FileSource::toByteValue).orElse(null),
-                toStringArrayData(meta.valueStatsCols()));
+                toStringArrayData(meta.valueStatsCols()),
+                BinaryString.fromString(meta.externalPath()));
     }
 
     @Override
@@ -80,6 +81,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 row.isNullAt(13) ? null : row.getLong(13),
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
-                row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)));
+                row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)),
+                row.isNullAt(17) ? null : row.getString(17).toString());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 3e351cd1da..5da96da765 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -155,6 +155,7 @@ public class CommitMessageLegacyV2Serializer {
                     null,
                     null,
                     null,
+                    null,
                     null);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 9fc251c366..c65f8302aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMeta09Serializer;
+import org.apache.paimon.io.DataFileMeta10LegacySerializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.DataInputDeserializer;
@@ -47,11 +48,12 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 5;
+    private static final int CURRENT_VERSION = 6;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
 
+    private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
     private DataFileMeta09Serializer dataFile09Serializer;
     private DataFileMeta08Serializer dataFile08Serializer;
     private IndexFileMeta09Serializer indexEntry09Serializer;
@@ -129,8 +131,13 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
 
     private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
             int version, DataInputView view) {
-        if (version >= 4) {
+        if (version >= 5) {
             return () -> dataFileSerializer.deserializeList(view);
+        } else if (version == 4) {
+            if (dataFileMeta10LegacySerializer == null) {
+                dataFileMeta10LegacySerializer = new 
DataFileMeta10LegacySerializer();
+            }
+            return () -> dataFileMeta10LegacySerializer.deserializeList(view);
         } else if (version == 3) {
             if (dataFile09Serializer == null) {
                 dataFile09Serializer = new DataFileMeta09Serializer();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index b9460f28b4..40673ee788 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMeta09Serializer;
+import org.apache.paimon.io.DataFileMeta10LegacySerializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
@@ -51,7 +52,7 @@ public class DataSplit implements Split {
 
     private static final long serialVersionUID = 7L;
     private static final long MAGIC = -2394839472490812314L;
-    private static final int VERSION = 4;
+    private static final int VERSION = 5;
 
     private long snapshotId = 0;
     private BinaryRow partition;
@@ -362,7 +363,10 @@ public class DataSplit implements Split {
         } else if (version == 2) {
             DataFileMeta09Serializer serializer = new 
DataFileMeta09Serializer();
             return serializer::deserialize;
-        } else if (version >= 3) {
+        } else if (version == 3) {
+            DataFileMeta10LegacySerializer serializer = new 
DataFileMeta10LegacySerializer();
+            return serializer::deserialize;
+        } else if (version >= 4) {
             DataFileMetaSerializer serializer = new DataFileMetaSerializer();
             return serializer::deserialize;
         } else {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index be41477356..27fa311ddb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -160,6 +160,7 @@ public class IndexBootstrapTest extends TableTestBase {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index 48c8d44876..a44ef9a530 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -57,6 +57,7 @@ public class DataFileTestUtils {
                 maxSeq - minSeq + 1,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index fbc02b2d73..34af551659 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -75,7 +75,8 @@ public class ManifestCommittableSerializerCompatibilityTest {
                         11L,
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
-                        Arrays.asList("field1", "field2", "field3"));
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs://localhost:9000/path/to/file");
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
@@ -106,6 +107,76 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
         assertThat(deserialized).isEqualTo(manifestCommittable);
     }
 
+    @Test
+    public void testCompatibilityToVersion5() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs://localhost:9000/path/to/file");
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v5"),
+                        true);
+        deserialized = serializer.deserialize(2, v2Bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
     @Test
     public void testCompatibilityToVersion4() throws IOException {
         SimpleStats keyStats =
@@ -136,7 +207,8 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         11L,
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
-                        Arrays.asList("field1", "field2", "field3"));
+                        Arrays.asList("field1", "field2", "field3"),
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
@@ -206,6 +278,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         11L,
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -276,6 +349,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         11L,
                         new byte[] {1, 2, 4},
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -346,6 +420,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         null,
                         null,
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 52d82e76be..19bd6a856b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -95,6 +95,7 @@ public abstract class ManifestFileMetaTestBase {
                         0L, // not used
                         embeddedIndex, // not used
                         FileSource.APPEND,
+                        null,
                         null));
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index bdee5c5f75..94c11498c5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -184,6 +184,7 @@ public class IntervalPartitionTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 9dc9834373..abff820b2c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -214,6 +214,7 @@ public class ExpireSnapshotsTest {
                         0L,
                         null,
                         FileSource.APPEND,
+                        null,
                         null);
         ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, 
dataFile);
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 0219941a0a..88394d2dc3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -139,7 +139,8 @@ public class SplitTest {
                         11L,
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
-                        Arrays.asList("field1", "field2", "field3"));
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs:///path/to/warehouse");
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
@@ -194,6 +195,7 @@ public class SplitTest {
                         11L,
                         new byte[] {1, 2, 4},
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -254,6 +256,7 @@ public class SplitTest {
                         11L,
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -314,7 +317,8 @@ public class SplitTest {
                         11L,
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
-                        Arrays.asList("field1", "field2", "field3"));
+                        Arrays.asList("field1", "field2", "field3"),
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
null);
@@ -347,6 +351,71 @@ public class SplitTest {
         assertThat(actual).isEqualTo(split);
     }
 
+    @Test
+    public void testSerializerCompatibleV4() throws Exception {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs:///path/to/warehouse");
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
null);
+        List<DeletionFile> deletionFiles = 
Collections.singletonList(deletionFile);
+
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+        binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+        binaryRowWriter.complete();
+
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(18)
+                        .withPartition(partition)
+                        .withBucket(20)
+                        .withDataFiles(dataFiles)
+                        .withDataDeletionFiles(deletionFiles)
+                        .withBucketPath("my path")
+                        .build();
+
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        SplitTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/datasplit-v4"),
+                        true);
+
+        DataSplit actual =
+                InstantiationUtil.deserializeObject(v2Bytes, 
DataSplit.class.getClassLoader());
+        assertThat(actual).isEqualTo(split);
+    }
+
     private DataFileMeta newDataFile(long rowCount) {
         return DataFileMeta.forAppend(
                 "my_data_file.parquet",
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v4 
b/paimon-core/src/test/resources/compatibility/datasplit-v4
new file mode 100644
index 0000000000..6ccef002b1
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/datasplit-v4 differ
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v5 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v5
new file mode 100644
index 0000000000..8b2b05869b
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 differ

Reply via email to