This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa77e54da [minor] Fix merge conflicts in DataSplit
fa77e54da is described below
commit fa77e54da1fe35c163827c1adce51ea7cf3473ba
Author: Jingsong <[email protected]>
AuthorDate: Fri Jun 7 17:55:12 2024 +0800
[minor] Fix merge conflicts in DataSplit
---
.../apache/paimon/io/DataFileMeta08Serializer.java | 4 +-
.../org/apache/paimon/table/source/DataSplit.java | 49 +++++--------------
.../org/apache/paimon/table/source/SplitTest.java | 55 +---------------------
3 files changed, 14 insertions(+), 94 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index dc6d86c7f..9c12f3834 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -83,7 +83,7 @@ public class DataFileMeta08Serializer implements Serializable
{
}
}
- private void serialize(DataFileMeta meta, DataOutputView target) throws
IOException {
+ public void serialize(DataFileMeta meta, DataOutputView target) throws
IOException {
GenericRow row =
GenericRow.of(
BinaryString.fromString(meta.fileName()),
@@ -113,7 +113,7 @@ public class DataFileMeta08Serializer implements
Serializable {
return records;
}
- private DataFileMeta deserialize(DataInputView in) throws IOException {
+ public DataFileMeta deserialize(DataInputView in) throws IOException {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 5441d55d7..3becc291a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -18,7 +18,6 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
@@ -27,7 +26,7 @@ import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
-import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.SerializationUtils;
import javax.annotation.Nullable;
@@ -256,36 +255,6 @@ public class DataSplit implements Split {
out.writeBoolean(rawConvertible);
}
- @VisibleForTesting
- // this method is only used in test, used for testing compatible between
version 0.8 and version
- // 0.9
- // do not use this method in any other place except test
- void serialize08(DataOutputView out) throws IOException {
- out.writeLong(snapshotId);
- SerializationUtils.serializeBinaryRow(partition, out);
- out.writeInt(bucket);
- out.writeUTF(bucketPath);
-
- DataFileMeta08Serializer dataFileSer = new DataFileMeta08Serializer();
- out.writeInt(beforeFiles.size());
- for (DataFileMeta file : beforeFiles) {
- dataFileSer.serialize(file, out);
- }
-
- DeletionFile.serializeList(out, beforeDeletionFiles);
-
- out.writeInt(dataFiles.size());
- for (DataFileMeta file : dataFiles) {
- dataFileSer.serialize(file, out);
- }
-
- DeletionFile.serializeList(out, dataDeletionFiles);
-
- out.writeBoolean(isStreaming);
-
- out.writeBoolean(rawConvertible);
- }
-
public static DataSplit deserialize(DataInputView in) throws IOException {
long magic = in.readLong();
int version = magic == MAGIC ? in.readInt() : 1;
@@ -295,11 +264,12 @@ public class DataSplit implements Split {
int bucket = in.readInt();
String bucketPath = in.readUTF();
- ObjectSerializer<DataFileMeta> dataFileSer = getFileMetaSerde(version);
+ FunctionWithIOException<DataInputView, DataFileMeta> dataFileSer =
+ getFileMetaSerde(version);
int beforeNumber = in.readInt();
List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
for (int i = 0; i < beforeNumber; i++) {
- beforeFiles.add(dataFileSer.deserialize(in));
+ beforeFiles.add(dataFileSer.apply(in));
}
List<DeletionFile> beforeDeletionFiles =
DeletionFile.deserializeList(in);
@@ -307,7 +277,7 @@ public class DataSplit implements Split {
int fileNumber = in.readInt();
List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
for (int i = 0; i < fileNumber; i++) {
- dataFiles.add(dataFileSer.deserialize(in));
+ dataFiles.add(dataFileSer.apply(in));
}
List<DeletionFile> dataDeletionFiles =
DeletionFile.deserializeList(in);
@@ -335,11 +305,14 @@ public class DataSplit implements Split {
return builder.build();
}
- private static ObjectSerializer<DataFileMeta> getFileMetaSerde(int
version) {
+ private static FunctionWithIOException<DataInputView, DataFileMeta>
getFileMetaSerde(
+ int version) {
if (version == 1) {
- return new DataFileMeta08Serializer();
+ DataFileMeta08Serializer serializer = new
DataFileMeta08Serializer();
+ return serializer::deserialize;
} else if (version == 2) {
- return new DataFileMetaSerializer();
+ DataFileMetaSerializer serializer = new DataFileMetaSerializer();
+ return serializer::deserialize;
} else {
throw new UnsupportedOperationException(
"Expecting DataSplit version to be smaller or equal than "
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index eb28afb71..4bc9b6f8b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -73,60 +73,7 @@ public class SplitTest {
}
@Test
- public void testSerializerCompatible() throws IOException {
- DataFileTestDataGenerator gen =
DataFileTestDataGenerator.builder().build();
- DataFileTestDataGenerator.Data data = gen.next();
- List<DataFileMeta> files = new ArrayList<>();
- List<DataFileMeta> files2 = new ArrayList<>();
- for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
- DataFileMeta meta = gen.next().meta;
- files.add(meta);
- files2.add(
- new DataFileMeta(
- meta.fileName(),
- meta.fileSize(),
- meta.rowCount(),
- meta.minKey(),
- meta.maxKey(),
- meta.keyStats(),
- meta.valueStats(),
- meta.minSequenceNumber(),
- meta.maxSequenceNumber(),
- meta.schemaId(),
- meta.level(),
- meta.extraFiles(),
- meta.creationTime(),
- meta.deleteRowCount().orElse(null),
- meta.embeddedIndex(),
- null));
- }
- DataSplit split =
- DataSplit.builder()
-
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
- .withPartition(data.partition)
- .withBucket(data.bucket)
- .withDataFiles(files)
- .withBucketPath("my path")
- .build();
-
- DataSplit split2 =
- DataSplit.builder()
- .withSnapshot(split.snapshotId())
- .withPartition(data.partition)
- .withBucket(data.bucket)
- .withDataFiles(files2)
- .withBucketPath("my path")
- .build();
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- split.serialize08(new DataOutputViewStreamWrapper(out));
-
- DataSplit newSplit = DataSplit.deserialize(new
DataInputDeserializer(out.toByteArray()));
- assertThat(newSplit).isEqualTo(split2);
- }
-
- @Test
- public void testSerializer2() throws Exception {
+ public void testSerializerCompatible() throws Exception {
SimpleStats keyStats =
new SimpleStats(
singleColumn("min_key"),