This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 98f52dd8a [core] Resovle source state compatible issue between version 
0.8 and 0.9 (#3478)
98f52dd8a is described below

commit 98f52dd8a2c6d51e8c0fc370a9fe4c3d1cb4c043
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jun 7 17:48:58 2024 +0800

    [core] Resovle source state compatible issue between version 0.8 and 0.9 
(#3478)
---
 .../org/apache/paimon/table/source/DataSplit.java  |  59 +++++++++-
 .../org/apache/paimon/table/source/SplitTest.java  | 124 +++++++++++++++++++++
 .../src/test/resources/compatibility/datasplit-v1  | Bin 0 -> 790 bytes
 3 files changed, 181 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 9a7d4848a..5441d55d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -18,13 +18,16 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.utils.ObjectSerializer;
 import org.apache.paimon.utils.SerializationUtils;
 
 import javax.annotation.Nullable;
@@ -46,6 +49,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class DataSplit implements Split {
 
     private static final long serialVersionUID = 7L;
+    private static final long MAGIC = -2394839472490812314L;
+    private static final int VERSION = 2;
 
     private long snapshotId = 0;
     private BinaryRow partition;
@@ -224,6 +229,8 @@ public class DataSplit implements Split {
     }
 
     public void serialize(DataOutputView out) throws IOException {
+        out.writeLong(MAGIC);
+        out.writeInt(VERSION);
         out.writeLong(snapshotId);
         SerializationUtils.serializeBinaryRow(partition, out);
         out.writeInt(bucket);
@@ -249,13 +256,46 @@ public class DataSplit implements Split {
         out.writeBoolean(rawConvertible);
     }
 
+    @VisibleForTesting
+    // this method is only used in test, used for testing compatible between 
version 0.8 and version
+    // 0.9
+    // do not use this method in any other place except test
+    void serialize08(DataOutputView out) throws IOException {
+        out.writeLong(snapshotId);
+        SerializationUtils.serializeBinaryRow(partition, out);
+        out.writeInt(bucket);
+        out.writeUTF(bucketPath);
+
+        DataFileMeta08Serializer dataFileSer = new DataFileMeta08Serializer();
+        out.writeInt(beforeFiles.size());
+        for (DataFileMeta file : beforeFiles) {
+            dataFileSer.serialize(file, out);
+        }
+
+        DeletionFile.serializeList(out, beforeDeletionFiles);
+
+        out.writeInt(dataFiles.size());
+        for (DataFileMeta file : dataFiles) {
+            dataFileSer.serialize(file, out);
+        }
+
+        DeletionFile.serializeList(out, dataDeletionFiles);
+
+        out.writeBoolean(isStreaming);
+
+        out.writeBoolean(rawConvertible);
+    }
+
     public static DataSplit deserialize(DataInputView in) throws IOException {
-        long snapshotId = in.readLong();
+        long magic = in.readLong();
+        int version = magic == MAGIC ? in.readInt() : 1;
+        // version 1 does not write magic number in, so the first long is 
snapshot id.
+        long snapshotId = version == 1 ? magic : in.readLong();
         BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
         int bucket = in.readInt();
         String bucketPath = in.readUTF();
 
-        DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
+        ObjectSerializer<DataFileMeta> dataFileSer = getFileMetaSerde(version);
         int beforeNumber = in.readInt();
         List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
         for (int i = 0; i < beforeNumber; i++) {
@@ -295,6 +335,21 @@ public class DataSplit implements Split {
         return builder.build();
     }
 
+    private static ObjectSerializer<DataFileMeta> getFileMetaSerde(int 
version) {
+        if (version == 1) {
+            return new DataFileMeta08Serializer();
+        } else if (version == 2) {
+            return new DataFileMetaSerializer();
+        } else {
+            throw new UnsupportedOperationException(
+                    "Expecting DataSplit version to be smaller or equal than "
+                            + VERSION
+                            + ", but found "
+                            + version
+                            + ".");
+        }
+    }
+
     public static Builder builder() {
         return new Builder();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index b5dda8df7..eb28afb71 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -18,19 +18,31 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileTestDataGenerator;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.InstantiationUtil;
 
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.paimon.data.BinaryArray.fromLongArray;
+import static org.apache.paimon.data.BinaryRow.singleColumn;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link DataSplit}. */
@@ -59,4 +71,116 @@ public class SplitTest {
         DataSplit newSplit = DataSplit.deserialize(new 
DataInputDeserializer(out.toByteArray()));
         assertThat(newSplit).isEqualTo(split);
     }
+
+    @Test
+    public void testSerializerCompatible() throws IOException {
+        DataFileTestDataGenerator gen = 
DataFileTestDataGenerator.builder().build();
+        DataFileTestDataGenerator.Data data = gen.next();
+        List<DataFileMeta> files = new ArrayList<>();
+        List<DataFileMeta> files2 = new ArrayList<>();
+        for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
+            DataFileMeta meta = gen.next().meta;
+            files.add(meta);
+            files2.add(
+                    new DataFileMeta(
+                            meta.fileName(),
+                            meta.fileSize(),
+                            meta.rowCount(),
+                            meta.minKey(),
+                            meta.maxKey(),
+                            meta.keyStats(),
+                            meta.valueStats(),
+                            meta.minSequenceNumber(),
+                            meta.maxSequenceNumber(),
+                            meta.schemaId(),
+                            meta.level(),
+                            meta.extraFiles(),
+                            meta.creationTime(),
+                            meta.deleteRowCount().orElse(null),
+                            meta.embeddedIndex(),
+                            null));
+        }
+        DataSplit split =
+                DataSplit.builder()
+                        
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
+                        .withPartition(data.partition)
+                        .withBucket(data.bucket)
+                        .withDataFiles(files)
+                        .withBucketPath("my path")
+                        .build();
+
+        DataSplit split2 =
+                DataSplit.builder()
+                        .withSnapshot(split.snapshotId())
+                        .withPartition(data.partition)
+                        .withBucket(data.bucket)
+                        .withDataFiles(files2)
+                        .withBucketPath("my path")
+                        .build();
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        split.serialize08(new DataOutputViewStreamWrapper(out));
+
+        DataSplit newSplit = DataSplit.deserialize(new 
DataInputDeserializer(out.toByteArray()));
+        assertThat(newSplit).isEqualTo(split2);
+    }
+
+    @Test
+    public void testSerializer2() throws Exception {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        null);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+        binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+        binaryRowWriter.complete();
+
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(18)
+                        .withPartition(partition)
+                        .withBucket(20)
+                        .withDataFiles(dataFiles)
+                        .withBucketPath("my path")
+                        .build();
+
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        SplitTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/datasplit-v1"),
+                        true);
+
+        DataSplit actual =
+                InstantiationUtil.deserializeObject(v2Bytes, 
DataSplit.class.getClassLoader());
+        assertThat(actual).isEqualTo(split);
+    }
 }
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v1 
b/paimon-core/src/test/resources/compatibility/datasplit-v1
new file mode 100644
index 000000000..bfe78725f
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/datasplit-v1 differ

Reply via email to