This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 293f5fe4bb [core] Upgrade data split serializer and manifest 
serializer version (#6165)
293f5fe4bb is described below

commit 293f5fe4bb8d290113afe1a41c471212de403c94
Author: YeJunHao <41894543+leaves12...@users.noreply.github.com>
AuthorDate: Thu Aug 28 11:26:45 2025 +0800

    [core] Upgrade data split serializer and manifest serializer version (#6165)
---
 .../io/DataFileMetaFirstRowIdLegacySerializer.java |  91 +++++++++++++++++++++
 .../paimon/table/sink/CommitMessageSerializer.java |  13 ++-
 .../org/apache/paimon/table/source/DataSplit.java  |   9 +-
 ...festCommittableSerializerCompatibilityTest.java |  80 +++++++++++++++++-
 ...SplitTest.java => DataSplitCompatibleTest.java} |  88 ++++++++++++++++++--
 .../src/test/resources/compatibility/datasplit-v7  | Bin 1032 -> 992 bytes
 .../compatibility/{datasplit-v7 => datasplit-v8}   | Bin 1032 -> 1032 bytes
 .../compatibility/manifest-committable-v8          | Bin 3786 -> 3578 bytes
 ...fest-committable-v8 => manifest-committable-v9} | Bin 3786 -> 3786 bytes
 9 files changed, 267 insertions(+), 14 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java
new file mode 100644
index 0000000000..59abcc730d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Serializer for {@link DataFileMeta}. */
+public class DataFileMetaFirstRowIdLegacySerializer extends 
ObjectSerializer<DataFileMeta> {
+
+    private static final long serialVersionUID = 1L;
+
+    public DataFileMetaFirstRowIdLegacySerializer() {
+        super(DataFileMeta.SCHEMA);
+    }
+
+    @Override
+    public InternalRow toRow(DataFileMeta meta) {
+        return GenericRow.of(
+                BinaryString.fromString(meta.fileName()),
+                meta.fileSize(),
+                meta.rowCount(),
+                serializeBinaryRow(meta.minKey()),
+                serializeBinaryRow(meta.maxKey()),
+                meta.keyStats().toRow(),
+                meta.valueStats().toRow(),
+                meta.minSequenceNumber(),
+                meta.maxSequenceNumber(),
+                meta.schemaId(),
+                meta.level(),
+                toStringArrayData(meta.extraFiles()),
+                meta.creationTime(),
+                meta.deleteRowCount().orElse(null),
+                meta.embeddedIndex(),
+                meta.fileSource().map(FileSource::toByteValue).orElse(null),
+                toStringArrayData(meta.valueStatsCols()),
+                meta.externalPath().map(BinaryString::fromString).orElse(null),
+                meta.firstRowId(),
+                null);
+    }
+
+    @Override
+    public DataFileMeta fromRow(InternalRow row) {
+        return DataFileMeta.create(
+                row.getString(0).toString(),
+                row.getLong(1),
+                row.getLong(2),
+                deserializeBinaryRow(row.getBinary(3)),
+                deserializeBinaryRow(row.getBinary(4)),
+                SimpleStats.fromRow(row.getRow(5, 3)),
+                SimpleStats.fromRow(row.getRow(6, 3)),
+                row.getLong(7),
+                row.getLong(8),
+                row.getLong(9),
+                row.getInt(10),
+                fromStringArrayData(row.getArray(11)),
+                row.getTimestamp(12, 3),
+                row.isNullAt(13) ? null : row.getLong(13),
+                row.isNullAt(14) ? null : row.getBinary(14),
+                row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
+                row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)),
+                row.isNullAt(17) ? null : row.getString(17).toString(),
+                row.isNullAt(18) ? null : row.getLong(18),
+                null);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 5c5fc7c0c0..1b652b9644 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -29,6 +29,7 @@ import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMeta09Serializer;
 import org.apache.paimon.io.DataFileMeta10LegacySerializer;
 import org.apache.paimon.io.DataFileMeta12LegacySerializer;
+import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.DataInputDeserializer;
@@ -50,11 +51,12 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 8;
+    private static final int CURRENT_VERSION = 9;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
 
+    private DataFileMetaFirstRowIdLegacySerializer 
dataFileMetaFirstRowIdLegacySerializer;
     private DataFileMeta12LegacySerializer dataFileMeta12LegacySerializer;
     private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
     private DataFileMeta09Serializer dataFile09Serializer;
@@ -146,8 +148,15 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
 
     private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
             int version, DataInputView view) {
-        if (version >= 8) {
+
+        if (version == 9) {
             return () -> dataFileSerializer.deserializeList(view);
+        } else if (version == 8) {
+            if (dataFileMetaFirstRowIdLegacySerializer == null) {
+                dataFileMetaFirstRowIdLegacySerializer =
+                        new DataFileMetaFirstRowIdLegacySerializer();
+            }
+            return () -> 
dataFileMetaFirstRowIdLegacySerializer.deserializeList(view);
         } else if (version == 6 || version == 7) {
             if (dataFileMeta12LegacySerializer == null) {
                 dataFileMeta12LegacySerializer = new 
DataFileMeta12LegacySerializer();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 8bdace010d..079477b001 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMeta09Serializer;
 import org.apache.paimon.io.DataFileMeta10LegacySerializer;
 import org.apache.paimon.io.DataFileMeta12LegacySerializer;
+import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
@@ -61,7 +62,7 @@ public class DataSplit implements Split {
 
     private static final long serialVersionUID = 7L;
     private static final long MAGIC = -2394839472490812314L;
-    private static final int VERSION = 7;
+    private static final int VERSION = 8;
 
     private long snapshotId = 0;
     private BinaryRow partition;
@@ -452,7 +453,11 @@ public class DataSplit implements Split {
         } else if (version == 5 || version == 6) {
             DataFileMeta12LegacySerializer serializer = new 
DataFileMeta12LegacySerializer();
             return serializer::deserialize;
-        } else if (version >= 7) {
+        } else if (version == 7) {
+            DataFileMetaFirstRowIdLegacySerializer serializer =
+                    new DataFileMetaFirstRowIdLegacySerializer();
+            return serializer::deserialize;
+        } else if (version == 8) {
             DataFileMetaSerializer serializer = new DataFileMetaSerializer();
             return serializer::deserialize;
         } else {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index b458c29fe8..927da8e97d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -46,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ManifestCommittableSerializerCompatibilityTest {
 
     @Test
-    public void testCompatibilityToV4CommitV8() throws IOException {
+    public void testCompatibilityToV4CommitV9() throws IOException {
         SimpleStats keyStats =
                 new SimpleStats(
                         singleColumn("min_key"),
@@ -118,6 +118,84 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
         ManifestCommittable deserialized = 
serializer.deserialize(serializer.getVersion(), bytes);
         assertThat(deserialized).isEqualTo(manifestCommittable);
 
+        byte[] oldBytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v9"),
+                        true);
+        deserialized = serializer.deserialize(4, oldBytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
+    @Test
+    public void testCompatibilityToV4CommitV8() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                DataFileMeta.create(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs://localhost:9000/path/to/file",
+                        1L,
+                        null);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+        IndexFileMeta indexFile =
+                new IndexFileMeta(
+                        "my_index_type", "my_index_file", 1024 * 100, 1002, 
dvMetas, null);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        16,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+        manifestCommittable.addProperty("k1", "v1");
+        manifestCommittable.addProperty("k2", "v2");
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+
+        ManifestCommittable deserialized = 
serializer.deserialize(serializer.getVersion(), bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
         byte[] oldBytes =
                 IOUtils.readFully(
                         ManifestCommittableSerializerCompatibilityTest.class
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
similarity index 90%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
index 4d6d842f75..cfe9086fa0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
@@ -59,7 +59,7 @@ import static org.apache.paimon.data.BinaryRow.singleColumn;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link DataSplit}. */
-public class SplitTest {
+public class DataSplitCompatibleTest {
 
     @Test
     public void testSplitMergedRowCount() {
@@ -273,7 +273,7 @@ public class SplitTest {
 
         byte[] v2Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v1"),
                         true);
@@ -338,7 +338,7 @@ public class SplitTest {
 
         byte[] v2Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v2"),
                         true);
@@ -407,7 +407,7 @@ public class SplitTest {
 
         byte[] v2Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v3"),
                         true);
@@ -476,7 +476,7 @@ public class SplitTest {
 
         byte[] v4Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v4"),
                         true);
@@ -545,7 +545,7 @@ public class SplitTest {
 
         byte[] v5Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v5"),
                         true);
@@ -615,7 +615,7 @@ public class SplitTest {
 
         byte[] v6Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v6"),
                         true);
@@ -659,7 +659,7 @@ public class SplitTest {
                         Arrays.asList("field1", "field2", "field3"),
                         "hdfs:///path/to/warehouse",
                         12L,
-                        Arrays.asList("a", "b", "c", "f"));
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
@@ -685,7 +685,7 @@ public class SplitTest {
 
         byte[] v6Bytes =
                 IOUtils.readFully(
-                        SplitTest.class
+                        DataSplitCompatibleTest.class
                                 .getClassLoader()
                                 
.getResourceAsStream("compatibility/datasplit-v7"),
                         true);
@@ -695,6 +695,76 @@ public class SplitTest {
         assertThat(actual).isEqualTo(split);
     }
 
+    @Test
+    public void testSerializerCompatibleV8() throws Exception {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+
+        DataFileMeta dataFile =
+                DataFileMeta.create(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs:///path/to/warehouse",
+                        12L,
+                        Arrays.asList("a", "b", "c", "f"));
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
+        List<DeletionFile> deletionFiles = 
Collections.singletonList(deletionFile);
+
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+        binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+        binaryRowWriter.complete();
+
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(18)
+                        .withPartition(partition)
+                        .withBucket(20)
+                        .withTotalBuckets(32)
+                        .withDataFiles(dataFiles)
+                        .withDataDeletionFiles(deletionFiles)
+                        .withBucketPath("my path")
+                        .build();
+
+        
assertThat(InstantiationUtil.clone(InstantiationUtil.clone(split))).isEqualTo(split);
+
+        byte[] v6Bytes =
+                IOUtils.readFully(
+                        DataSplitCompatibleTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/datasplit-v8"),
+                        true);
+
+        DataSplit actual =
+                InstantiationUtil.deserializeObject(v6Bytes, 
DataSplit.class.getClassLoader());
+        assertThat(actual).isEqualTo(split);
+    }
+
     private DataFileMeta newDataFile(long rowCount) {
         return newDataFile(rowCount, null, null);
     }
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7 
b/paimon-core/src/test/resources/compatibility/datasplit-v7
index 3fc65743a6..16b16ca73b 100644
Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v7 and 
b/paimon-core/src/test/resources/compatibility/datasplit-v7 differ
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7 
b/paimon-core/src/test/resources/compatibility/datasplit-v8
similarity index 91%
copy from paimon-core/src/test/resources/compatibility/datasplit-v7
copy to paimon-core/src/test/resources/compatibility/datasplit-v8
index 3fc65743a6..5fc9725731 100644
Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v7 and 
b/paimon-core/src/test/resources/compatibility/datasplit-v8 differ
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8
index 922e55c5d7..0f2ca46b92 100644
Binary files 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8 differ
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v9
similarity index 98%
copy from paimon-core/src/test/resources/compatibility/manifest-committable-v8
copy to paimon-core/src/test/resources/compatibility/manifest-committable-v9
index 922e55c5d7..336a2aa3f5 100644
Binary files 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v9 differ

Reply via email to