This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 98f52dd8a [core] Resovle source state compatible issue between version
0.8 and 0.9 (#3478)
98f52dd8a is described below
commit 98f52dd8a2c6d51e8c0fc370a9fe4c3d1cb4c043
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jun 7 17:48:58 2024 +0800
[core] Resovle source state compatible issue between version 0.8 and 0.9
(#3478)
---
.../org/apache/paimon/table/source/DataSplit.java | 59 +++++++++-
.../org/apache/paimon/table/source/SplitTest.java | 124 +++++++++++++++++++++
.../src/test/resources/compatibility/datasplit-v1 | Bin 0 -> 790 bytes
3 files changed, 181 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 9a7d4848a..5441d55d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -18,13 +18,16 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.SerializationUtils;
import javax.annotation.Nullable;
@@ -46,6 +49,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class DataSplit implements Split {
private static final long serialVersionUID = 7L;
+ private static final long MAGIC = -2394839472490812314L;
+ private static final int VERSION = 2;
private long snapshotId = 0;
private BinaryRow partition;
@@ -224,6 +229,8 @@ public class DataSplit implements Split {
}
public void serialize(DataOutputView out) throws IOException {
+ out.writeLong(MAGIC);
+ out.writeInt(VERSION);
out.writeLong(snapshotId);
SerializationUtils.serializeBinaryRow(partition, out);
out.writeInt(bucket);
@@ -249,13 +256,46 @@ public class DataSplit implements Split {
out.writeBoolean(rawConvertible);
}
+ @VisibleForTesting
+ // this method is only used in test, used for testing compatible between
version 0.8 and version
+ // 0.9
+ // do not use this method in any other place except test
+ void serialize08(DataOutputView out) throws IOException {
+ out.writeLong(snapshotId);
+ SerializationUtils.serializeBinaryRow(partition, out);
+ out.writeInt(bucket);
+ out.writeUTF(bucketPath);
+
+ DataFileMeta08Serializer dataFileSer = new DataFileMeta08Serializer();
+ out.writeInt(beforeFiles.size());
+ for (DataFileMeta file : beforeFiles) {
+ dataFileSer.serialize(file, out);
+ }
+
+ DeletionFile.serializeList(out, beforeDeletionFiles);
+
+ out.writeInt(dataFiles.size());
+ for (DataFileMeta file : dataFiles) {
+ dataFileSer.serialize(file, out);
+ }
+
+ DeletionFile.serializeList(out, dataDeletionFiles);
+
+ out.writeBoolean(isStreaming);
+
+ out.writeBoolean(rawConvertible);
+ }
+
public static DataSplit deserialize(DataInputView in) throws IOException {
- long snapshotId = in.readLong();
+ long magic = in.readLong();
+ int version = magic == MAGIC ? in.readInt() : 1;
+ // version 1 does not write magic number in, so the first long is
snapshot id.
+ long snapshotId = version == 1 ? magic : in.readLong();
BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
int bucket = in.readInt();
String bucketPath = in.readUTF();
- DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
+ ObjectSerializer<DataFileMeta> dataFileSer = getFileMetaSerde(version);
int beforeNumber = in.readInt();
List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
for (int i = 0; i < beforeNumber; i++) {
@@ -295,6 +335,21 @@ public class DataSplit implements Split {
return builder.build();
}
+ private static ObjectSerializer<DataFileMeta> getFileMetaSerde(int
version) {
+ if (version == 1) {
+ return new DataFileMeta08Serializer();
+ } else if (version == 2) {
+ return new DataFileMetaSerializer();
+ } else {
+ throw new UnsupportedOperationException(
+ "Expecting DataSplit version to be smaller or equal than "
+ + VERSION
+ + ", but found "
+ + version
+ + ".");
+ }
+ }
+
public static Builder builder() {
return new Builder();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index b5dda8df7..eb28afb71 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -18,19 +18,31 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestDataGenerator;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.InstantiationUtil;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.data.BinaryArray.fromLongArray;
+import static org.apache.paimon.data.BinaryRow.singleColumn;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DataSplit}. */
@@ -59,4 +71,116 @@ public class SplitTest {
DataSplit newSplit = DataSplit.deserialize(new
DataInputDeserializer(out.toByteArray()));
assertThat(newSplit).isEqualTo(split);
}
+
+ @Test
+ public void testSerializerCompatible() throws IOException {
+ DataFileTestDataGenerator gen =
DataFileTestDataGenerator.builder().build();
+ DataFileTestDataGenerator.Data data = gen.next();
+ List<DataFileMeta> files = new ArrayList<>();
+ List<DataFileMeta> files2 = new ArrayList<>();
+ for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
+ DataFileMeta meta = gen.next().meta;
+ files.add(meta);
+ files2.add(
+ new DataFileMeta(
+ meta.fileName(),
+ meta.fileSize(),
+ meta.rowCount(),
+ meta.minKey(),
+ meta.maxKey(),
+ meta.keyStats(),
+ meta.valueStats(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ meta.extraFiles(),
+ meta.creationTime(),
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex(),
+ null));
+ }
+ DataSplit split =
+ DataSplit.builder()
+
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
+ .withPartition(data.partition)
+ .withBucket(data.bucket)
+ .withDataFiles(files)
+ .withBucketPath("my path")
+ .build();
+
+ DataSplit split2 =
+ DataSplit.builder()
+ .withSnapshot(split.snapshotId())
+ .withPartition(data.partition)
+ .withBucket(data.bucket)
+ .withDataFiles(files2)
+ .withBucketPath("my path")
+ .build();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ split.serialize08(new DataOutputViewStreamWrapper(out));
+
+ DataSplit newSplit = DataSplit.deserialize(new
DataInputDeserializer(out.toByteArray()));
+ assertThat(newSplit).isEqualTo(split2);
+ }
+
+ @Test
+ public void testSerializer2() throws Exception {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ null);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+ binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+ binaryRowWriter.complete();
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(18)
+ .withPartition(partition)
+ .withBucket(20)
+ .withDataFiles(dataFiles)
+ .withBucketPath("my path")
+ .build();
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ SplitTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/datasplit-v1"),
+ true);
+
+ DataSplit actual =
+ InstantiationUtil.deserializeObject(v2Bytes,
DataSplit.class.getClassLoader());
+ assertThat(actual).isEqualTo(split);
+ }
}
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v1
b/paimon-core/src/test/resources/compatibility/datasplit-v1
new file mode 100644
index 000000000..bfe78725f
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/datasplit-v1 differ