This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 293f5fe4bb [core] Upgrade data split serializer and manifest serializer version (#6165) 293f5fe4bb is described below commit 293f5fe4bb8d290113afe1a41c471212de403c94 Author: YeJunHao <41894543+leaves12...@users.noreply.github.com> AuthorDate: Thu Aug 28 11:26:45 2025 +0800 [core] Upgrade data split serializer and manifest serializer version (#6165) --- .../io/DataFileMetaFirstRowIdLegacySerializer.java | 91 +++++++++++++++++++++ .../paimon/table/sink/CommitMessageSerializer.java | 13 ++- .../org/apache/paimon/table/source/DataSplit.java | 9 +- ...festCommittableSerializerCompatibilityTest.java | 80 +++++++++++++++++- ...SplitTest.java => DataSplitCompatibleTest.java} | 88 ++++++++++++++++++-- .../src/test/resources/compatibility/datasplit-v7 | Bin 1032 -> 992 bytes .../compatibility/{datasplit-v7 => datasplit-v8} | Bin 1032 -> 1032 bytes .../compatibility/manifest-committable-v8 | Bin 3786 -> 3578 bytes ...fest-committable-v8 => manifest-committable-v9} | Bin 3786 -> 3786 bytes 9 files changed, 267 insertions(+), 14 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java new file mode 100644 index 0000000000..59abcc730d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.utils.ObjectSerializer; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link DataFileMeta}. */ +public class DataFileMetaFirstRowIdLegacySerializer extends ObjectSerializer<DataFileMeta> { + + private static final long serialVersionUID = 1L; + + public DataFileMetaFirstRowIdLegacySerializer() { + super(DataFileMeta.SCHEMA); + } + + @Override + public InternalRow toRow(DataFileMeta meta) { + return GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime(), + meta.deleteRowCount().orElse(null), + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null), + toStringArrayData(meta.valueStatsCols()), + meta.externalPath().map(BinaryString::fromString).orElse(null), + meta.firstRowId(), + null); + } + + @Override + public DataFileMeta fromRow(InternalRow row) { + return DataFileMeta.create( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), + row.isNullAt(17) ? null : row.getString(17).toString(), + row.isNullAt(18) ? null : row.getLong(18), + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 5c5fc7c0c0..1b652b9644 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -29,6 +29,7 @@ import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; import org.apache.paimon.io.DataFileMeta10LegacySerializer; import org.apache.paimon.io.DataFileMeta12LegacySerializer; +import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -50,11 +51,12 @@ import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; /** {@link VersionedSerializer} for {@link CommitMessage}. */ public class CommitMessageSerializer implements VersionedSerializer<CommitMessage> { - private static final int CURRENT_VERSION = 8; + private static final int CURRENT_VERSION = 9; private final DataFileMetaSerializer dataFileSerializer; private final IndexFileMetaSerializer indexEntrySerializer; + private DataFileMetaFirstRowIdLegacySerializer dataFileMetaFirstRowIdLegacySerializer; private DataFileMeta12LegacySerializer dataFileMeta12LegacySerializer; private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer; private DataFileMeta09Serializer dataFile09Serializer; @@ -146,8 +148,15 @@ public class CommitMessageSerializer implements VersionedSerializer<CommitMessag private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer( int version, DataInputView view) { - if (version >= 8) { + + if (version == 9) { return () -> dataFileSerializer.deserializeList(view); + } else if (version == 8) { + if (dataFileMetaFirstRowIdLegacySerializer == null) { + dataFileMetaFirstRowIdLegacySerializer = + new DataFileMetaFirstRowIdLegacySerializer(); + } + return () -> dataFileMetaFirstRowIdLegacySerializer.deserializeList(view); } else if (version == 6 || version == 7) { if (dataFileMeta12LegacySerializer == null) { dataFileMeta12LegacySerializer = new DataFileMeta12LegacySerializer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 8bdace010d..079477b001 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; import org.apache.paimon.io.DataFileMeta10LegacySerializer; import org.apache.paimon.io.DataFileMeta12LegacySerializer; +import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; @@ -61,7 +62,7 @@ public class DataSplit implements Split { private static final long serialVersionUID = 7L; private static final long MAGIC = -2394839472490812314L; - private static final int VERSION = 7; + private static final int VERSION = 8; private long snapshotId = 0; private BinaryRow partition; @@ -452,7 +453,11 @@ public class DataSplit implements Split { } else if (version == 5 || version == 6) { DataFileMeta12LegacySerializer serializer = new DataFileMeta12LegacySerializer(); return serializer::deserialize; - } else if (version >= 7) { + } else if (version == 7) { + DataFileMetaFirstRowIdLegacySerializer serializer = + new DataFileMetaFirstRowIdLegacySerializer(); + return serializer::deserialize; + } else if (version == 8) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index b458c29fe8..927da8e97d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -46,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class ManifestCommittableSerializerCompatibilityTest { @Test - public void testCompatibilityToV4CommitV8() throws IOException { + public void testCompatibilityToV4CommitV9() throws IOException { SimpleStats keyStats = new SimpleStats( singleColumn("min_key"), @@ -118,6 +118,84 @@ public class ManifestCommittableSerializerCompatibilityTest { ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); + byte[] oldBytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/manifest-committable-v9"), + true); + deserialized = serializer.deserialize(4, oldBytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + + @Test + public void testCompatibilityToV4CommitV8() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + DataFileMeta.create( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs://localhost:9000/path/to/file", + 1L, + null); + List<DataFileMeta> dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap<String, DeletionVectorMeta> dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L)); + IndexFileMeta indexFile = + new IndexFileMeta( + "my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas, null); + List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + 16, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + manifestCommittable.addProperty("k1", "v1"); + manifestCommittable.addProperty("k2", "v2"); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + byte[] oldBytes = IOUtils.readFully( ManifestCommittableSerializerCompatibilityTest.class diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java similarity index 90% rename from paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java rename to paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java index 4d6d842f75..cfe9086fa0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java @@ -59,7 +59,7 @@ import static org.apache.paimon.data.BinaryRow.singleColumn; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link DataSplit}. */ -public class SplitTest { +public class DataSplitCompatibleTest { @Test public void testSplitMergedRowCount() { @@ -273,7 +273,7 @@ public class SplitTest { byte[] v2Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v1"), true); @@ -338,7 +338,7 @@ public class SplitTest { byte[] v2Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v2"), true); @@ -407,7 +407,7 @@ public class SplitTest { byte[] v2Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v3"), true); @@ -476,7 +476,7 @@ public class SplitTest { byte[] v4Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v4"), true); @@ -545,7 +545,7 @@ public class SplitTest { byte[] v5Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v5"), true); @@ -615,7 +615,7 @@ public class SplitTest { byte[] v6Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v6"), true); @@ -659,7 +659,7 @@ public class SplitTest { Arrays.asList("field1", "field2", "field3"), "hdfs:///path/to/warehouse", 12L, - Arrays.asList("a", "b", "c", "f")); + null); List<DataFileMeta> dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); @@ -685,7 +685,7 @@ public class SplitTest { byte[] v6Bytes = IOUtils.readFully( - SplitTest.class + DataSplitCompatibleTest.class .getClassLoader() .getResourceAsStream("compatibility/datasplit-v7"), true); @@ -695,6 +695,76 @@ public class SplitTest { assertThat(actual).isEqualTo(split); } + @Test + public void testSerializerCompatibleV8() throws Exception { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + + DataFileMeta dataFile = + DataFileMeta.create( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs:///path/to/warehouse", + 12L, + Arrays.asList("a", "b", "c", "f")); + List<DataFileMeta> dataFiles = Collections.singletonList(dataFile); + + DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); + List<DeletionFile> deletionFiles = Collections.singletonList(deletionFile); + + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition); + binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa")); + binaryRowWriter.complete(); + + DataSplit split = + DataSplit.builder() + .withSnapshot(18) + .withPartition(partition) + .withBucket(20) + .withTotalBuckets(32) + .withDataFiles(dataFiles) + .withDataDeletionFiles(deletionFiles) + .withBucketPath("my path") + .build(); + + assertThat(InstantiationUtil.clone(InstantiationUtil.clone(split))).isEqualTo(split); + + byte[] v6Bytes = + IOUtils.readFully( + DataSplitCompatibleTest.class + .getClassLoader() + .getResourceAsStream("compatibility/datasplit-v8"), + true); + + DataSplit actual = + InstantiationUtil.deserializeObject(v6Bytes, DataSplit.class.getClassLoader()); + assertThat(actual).isEqualTo(split); + } + private DataFileMeta newDataFile(long rowCount) { return newDataFile(rowCount, null, null); } diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7 b/paimon-core/src/test/resources/compatibility/datasplit-v7 index 3fc65743a6..16b16ca73b 100644 Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v7 and b/paimon-core/src/test/resources/compatibility/datasplit-v7 differ diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7 b/paimon-core/src/test/resources/compatibility/datasplit-v8 similarity index 91% copy from paimon-core/src/test/resources/compatibility/datasplit-v7 copy to paimon-core/src/test/resources/compatibility/datasplit-v8 index 3fc65743a6..5fc9725731 100644 Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v7 and b/paimon-core/src/test/resources/compatibility/datasplit-v8 differ diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 b/paimon-core/src/test/resources/compatibility/manifest-committable-v8 index 922e55c5d7..0f2ca46b92 100644 Binary files a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 and b/paimon-core/src/test/resources/compatibility/manifest-committable-v8 differ diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 b/paimon-core/src/test/resources/compatibility/manifest-committable-v9 similarity index 98% copy from paimon-core/src/test/resources/compatibility/manifest-committable-v8 copy to paimon-core/src/test/resources/compatibility/manifest-committable-v9 index 922e55c5d7..336a2aa3f5 100644 Binary files a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 and b/paimon-core/src/test/resources/compatibility/manifest-committable-v9 differ