This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d328d6f2 [core] Make ManifestCommittableSerializer be compatible with 
Paimon 0.7 (#3476)
1d328d6f2 is described below

commit 1d328d6f28c7db1fae28210b03d88f59bb4c32e4
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 7 17:17:09 2024 +0800

    [core] Make ManifestCommittableSerializer be compatible with Paimon 0.7 
(#3476)
---
 .../apache/paimon/data/safe/SafeBinaryArray.java   | 208 +++++++++++++++++++++
 .../org/apache/paimon/data/safe/SafeBinaryRow.java | 190 +++++++++++++++++++
 .../java/org/apache/paimon/memory/BytesUtils.java  |  70 +++++++
 .../apache/paimon/memory/MemorySegmentUtils.java   |   4 +-
 .../java/org/apache/paimon/data/RowDataTest.java   |  44 +++++
 .../apache/paimon/data/SafeBinaryArrayTest.java    | 137 ++++++++++++++
 .../apache/paimon/io/DataFileMeta08Serializer.java |  75 +++++---
 .../manifest/ManifestCommittableSerializer.java    |  29 ++-
 .../sink/CommitMessageLegacyV2Serializer.java      | 188 +++++++++++++++++++
 .../paimon/table/sink/CommitMessageSerializer.java |  42 +++--
 ...festCommittableSerializerCompatibilityTest.java |  66 +++++++
 .../compatibility/manifest-committable-v2-0.7      | Bin 0 -> 2245 bytes
 12 files changed, 1010 insertions(+), 43 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java 
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
new file mode 100644
index 000000000..4170d3914
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.safe;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.BytesUtils;
+
+import static org.apache.paimon.data.BinaryArray.calculateHeaderInBytes;
+import static org.apache.paimon.memory.MemorySegmentUtils.BIT_BYTE_INDEX_MASK;
+import static org.apache.paimon.memory.MemorySegmentUtils.byteIndex;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link BinaryRow} which is safe avoid core dump. */
+public final class SafeBinaryArray implements InternalArray {
+
+    private final int size;
+    private final byte[] bytes;
+    private final int offset;
+    private final int elementOffset;
+
+    public SafeBinaryArray(byte[] bytes, int offset) {
+        checkArgument(bytes.length > offset + 4);
+        final int size = BytesUtils.getInt(bytes, offset);
+        assert size >= 0 : "size (" + size + ") should >= 0";
+
+        this.size = size;
+        this.bytes = bytes;
+        this.offset = offset;
+        this.elementOffset = offset + calculateHeaderInBytes(this.size);
+    }
+
+    @Override
+    public int size() {
+        return size;
+    }
+
+    private int getElementOffset(int ordinal, int elementSize) {
+        return elementOffset + ordinal * elementSize;
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        byte current = bytes[offset + 4 + byteIndex(pos)];
+        return (current & (1 << (pos & BIT_BYTE_INDEX_MASK))) != 0;
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return bytes[getElementOffset(pos, 1)] != 0;
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return bytes[getElementOffset(pos, 1)];
+    }
+
+    @Override
+    public short getShort(int pos) {
+        int fieldOffset = getElementOffset(pos, 2);
+        checkArgument(bytes.length >= fieldOffset + 2);
+        return BytesUtils.getShort(bytes, fieldOffset);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        int fieldOffset = getElementOffset(pos, 4);
+        checkArgument(bytes.length >= fieldOffset + 4);
+        return BytesUtils.getInt(bytes, fieldOffset);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        int fieldOffset = getElementOffset(pos, 8);
+        checkArgument(bytes.length >= fieldOffset + 8);
+        return BytesUtils.getLong(bytes, fieldOffset);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return Float.intBitsToFloat(getInt(pos));
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return Double.longBitsToDouble(getLong(pos));
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return BinaryString.fromBytes(getBinary(pos));
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        long longValue = getLong(pos);
+        if (Decimal.isCompact(precision)) {
+            return Decimal.fromUnscaledLong(longValue, precision, scale);
+        }
+
+        final int size = ((int) longValue);
+        int subOffset = (int) (longValue >> 32);
+        byte[] decimalBytes = new byte[size];
+        System.arraycopy(bytes, offset + subOffset, decimalBytes, 0, size);
+        return Decimal.fromUnscaledBytes(decimalBytes, precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        long longValue = getLong(pos);
+        if (Timestamp.isCompact(precision)) {
+            return Timestamp.fromEpochMillis(longValue);
+        }
+
+        final int nanoOfMillisecond = (int) longValue;
+        final int subOffset = (int) (longValue >> 32);
+
+        checkArgument(bytes.length >= offset + subOffset + 8);
+        final long millisecond = BytesUtils.getLong(bytes, offset + subOffset);
+        return Timestamp.fromEpochMillis(millisecond, nanoOfMillisecond);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return BytesUtils.readBinary(bytes, offset, getElementOffset(pos, 8), 
getLong(pos));
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean[] toBooleanArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public short[] toShortArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int[] toIntArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long[] toLongArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float[] toFloatArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double[] toDoubleArray() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
new file mode 100644
index 000000000..ce6b3ae62
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.safe;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.BytesUtils;
+import org.apache.paimon.types.RowKind;
+
+import static org.apache.paimon.data.BinaryRow.HEADER_SIZE_IN_BITS;
+import static org.apache.paimon.data.BinaryRow.calculateBitSetWidthInBytes;
+import static org.apache.paimon.memory.MemorySegmentUtils.BIT_BYTE_INDEX_MASK;
+import static org.apache.paimon.memory.MemorySegmentUtils.byteIndex;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link BinaryRow} which is safe avoid core dump. */
+public final class SafeBinaryRow implements InternalRow {
+
+    private final int arity;
+    private final int nullBitsSizeInBytes;
+    private final byte[] bytes;
+    private final int offset;
+
+    public SafeBinaryRow(int arity, byte[] bytes, int offset) {
+        checkArgument(arity >= 0);
+        this.arity = arity;
+        this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+        this.bytes = bytes;
+        this.offset = offset;
+    }
+
+    private int getFieldOffset(int pos) {
+        return offset + nullBitsSizeInBytes + pos * 8;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return arity;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        byte kindValue = bytes[offset];
+        return RowKind.fromByteValue(kindValue);
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        bytes[offset] = kind.toByteValue();
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        int index = pos + HEADER_SIZE_IN_BITS;
+        int offset = this.offset + byteIndex(index);
+        byte current = bytes[offset];
+        return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return bytes[getFieldOffset(pos)] != 0;
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return bytes[getFieldOffset(pos)];
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return BytesUtils.getShort(bytes, getFieldOffset(pos));
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return BytesUtils.getInt(bytes, getFieldOffset(pos));
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return BytesUtils.getLong(bytes, getFieldOffset(pos));
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return Float.intBitsToFloat(getInt(pos));
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return Double.longBitsToDouble(getLong(pos));
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return BinaryString.fromBytes(getBinary(pos));
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        long longValue = getLong(pos);
+        if (Decimal.isCompact(precision)) {
+            return Decimal.fromUnscaledLong(longValue, precision, scale);
+        }
+
+        final int size = ((int) longValue);
+        int subOffset = (int) (longValue >> 32);
+        byte[] decimalBytes = new byte[size];
+        System.arraycopy(bytes, offset + subOffset, decimalBytes, 0, size);
+        return Decimal.fromUnscaledBytes(decimalBytes, precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        long longValue = getLong(pos);
+        if (Timestamp.isCompact(precision)) {
+            return Timestamp.fromEpochMillis(longValue);
+        }
+
+        final int nanoOfMillisecond = (int) longValue;
+        final int subOffset = (int) (longValue >> 32);
+
+        checkArgument(bytes.length >= offset + subOffset + 8);
+        final long millisecond = BytesUtils.getLong(bytes, offset + subOffset);
+        return Timestamp.fromEpochMillis(millisecond, nanoOfMillisecond);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return BytesUtils.readBinary(bytes, offset, getFieldOffset(pos), 
getLong(pos));
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return readArrayData(bytes, offset, getLong(pos));
+    }
+
+    private static InternalArray readArrayData(byte[] bytes, int baseOffset, 
long offsetAndSize) {
+        int offset = (int) (offsetAndSize >> 32);
+        return new SafeBinaryArray(bytes, offset + baseOffset);
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return readNestedRow(bytes, numFields, offset, getLong(pos));
+    }
+
+    private static InternalRow readNestedRow(
+            byte[] bytes, int numFields, int baseOffset, long offsetAndSize) {
+        int offset = (int) (offsetAndSize >> 32);
+        return new SafeBinaryRow(numFields, bytes, offset + baseOffset);
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
new file mode 100644
index 000000000..db1717073
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import static org.apache.paimon.data.BinarySection.HIGHEST_FIRST_BIT;
+import static 
org.apache.paimon.data.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT;
+import static org.apache.paimon.memory.MemorySegment.LITTLE_ENDIAN;
+
+/** Utils for byte[]. */
+public class BytesUtils {
+
+    public static int getInt(byte[] bytes, int offset) {
+        return (bytes[offset + 3] << 24)
+                | ((bytes[offset + 2] & 0xff) << 16)
+                | ((bytes[offset + 1] & 0xff) << 8)
+                | (bytes[offset] & 0xff);
+    }
+
+    public static short getShort(byte[] bytes, int offset) {
+        return (short) ((bytes[offset + 1] << 8) | (bytes[offset] & 0xff));
+    }
+
+    public static long getLong(byte[] bytes, int offset) {
+        return ((long) bytes[offset + 7] << 56)
+                | (((long) bytes[offset + 6] & 0xff) << 48)
+                | (((long) bytes[offset + 5] & 0xff) << 40)
+                | (((long) bytes[offset + 4] & 0xff) << 32)
+                | (((long) bytes[offset + 3] & 0xff) << 24)
+                | (((long) bytes[offset + 2] & 0xff) << 16)
+                | (((long) bytes[offset + 1] & 0xff) << 8)
+                | ((long) bytes[offset] & 0xff);
+    }
+
+    public static byte[] readBinary(
+            byte[] bytes, int baseOffset, int fieldOffset, long 
variablePartOffsetAndLen) {
+        long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+        if (mark == 0) {
+            final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+            final int len = (int) variablePartOffsetAndLen;
+            byte[] ret = new byte[len];
+            System.arraycopy(bytes, baseOffset + subOffset, ret, 0, len);
+            return ret;
+        } else {
+            int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+            byte[] ret = new byte[len];
+            if (LITTLE_ENDIAN) {
+                System.arraycopy(bytes, fieldOffset, ret, 0, len);
+            } else {
+                System.arraycopy(bytes, fieldOffset + 1, ret, 0, len);
+            }
+            return ret;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
index 47407d196..a021c47ee 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
@@ -43,7 +43,7 @@ public class MemorySegmentUtils {
 
     private static final int ADDRESS_BITS_PER_WORD = 3;
 
-    private static final int BIT_BYTE_INDEX_MASK = 7;
+    public static final int BIT_BYTE_INDEX_MASK = 7;
 
     private static final int MAX_BYTES_LENGTH = 1024 * 64;
 
@@ -455,7 +455,7 @@ public class MemorySegmentUtils {
      * @param bitIndex the bit index.
      * @return the byte index.
      */
-    private static int byteIndex(int bitIndex) {
+    public static int byteIndex(int bitIndex) {
         return bitIndex >>> ADDRESS_BITS_PER_WORD;
     }
 
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
index 757b24d88..2998eade6 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.data;
 
+import org.apache.paimon.data.safe.SafeBinaryRow;
 import org.apache.paimon.data.serializer.InternalArraySerializer;
 import org.apache.paimon.data.serializer.InternalMapSerializer;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -91,6 +92,49 @@ public class RowDataTest {
         testSetters(nestedRow);
     }
 
+    @Test
+    public void testSafeBinaryRow() {
+        BinaryRow binaryRow = getBinaryRow();
+        SafeBinaryRow row = new SafeBinaryRow(binaryRow.getFieldCount(), 
binaryRow.toBytes(), 0);
+        assertThat(row.getFieldCount()).isEqualTo(18);
+
+        // test header
+        assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT);
+        row.setRowKind(RowKind.DELETE);
+        assertThat(row.getRowKind()).isEqualTo(RowKind.DELETE);
+
+        // test get
+        assertThat(row.isNullAt(0)).isFalse();
+        assertThat(row.getBoolean(0)).isTrue();
+        assertThat(row.getByte(1)).isEqualTo((byte) 1);
+        assertThat(row.getShort(2)).isEqualTo((short) 2);
+        assertThat(row.getInt(3)).isEqualTo(3);
+        assertThat(row.getLong(4)).isEqualTo(4L);
+        assertThat(row.getFloat(5)).isEqualTo(5f);
+        assertThat(row.getDouble(6)).isEqualTo(6d);
+        assertThat(row.getString(8)).isEqualTo(str);
+        assertThat(row.getString(9)).isEqualTo(str);
+        assertThat(row.getDecimal(10, 5, 0)).isEqualTo(decimal1);
+        assertThat(row.getDecimal(11, 20, 0)).isEqualTo(decimal2);
+        assertThat(row.getArray(12).size()).isEqualTo(2);
+        assertThat(row.getArray(12).isNullAt(0)).isFalse();
+        assertThat(row.getArray(12).getInt(0)).isEqualTo(15);
+        assertThat(row.getArray(12).isNullAt(1)).isFalse();
+        assertThat(row.getArray(12).getInt(1)).isEqualTo(16);
+        // TODO support map
+        // assertThat(row.getMap(13)).isEqualTo(map);
+        assertThat(row.getRow(14, 2).getInt(0)).isEqualTo(15);
+        assertThat(row.getRow(14, 2).getInt(1)).isEqualTo(16);
+        assertThat(row.getBinary(15)).isEqualTo(bytes);
+        assertThat(row.getTimestamp(16, 3)).isEqualTo(timestamp1);
+        assertThat(row.getTimestamp(17, 9)).isEqualTo(timestamp2);
+
+        // test isNull
+        binaryRow.setNullAt(10);
+        row = new SafeBinaryRow(binaryRow.getFieldCount(), 
binaryRow.toBytes(), 0);
+        assertThat(row.isNullAt(10)).isTrue();
+    }
+
     private BinaryRow getBinaryRow() {
         BinaryRow row = new BinaryRow(18);
         BinaryRowWriter writer = new BinaryRowWriter(row);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
new file mode 100644
index 000000000..08490f18c
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.data.safe.SafeBinaryArray;
+import org.apache.paimon.data.serializer.InternalArraySerializer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SafeBinaryArrayTest {
+
+    @Test
+    public void test() {
+        BinaryArray expected = toBinaryArray(DataTypes.BOOLEAN(), true, false, 
null, true);
+        BinaryArray converted =
+                toBinaryArray(DataTypes.BOOLEAN(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected = toBinaryArray(DataTypes.TINYINT(), (byte) 15, (byte) 12, 
null, (byte) 1);
+        converted = toBinaryArray(DataTypes.TINYINT(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected = toBinaryArray(DataTypes.SMALLINT(), (short) 15, (short) 12, 
null, (short) 1);
+        converted = toBinaryArray(DataTypes.SMALLINT(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected = toBinaryArray(DataTypes.INT(), 15, 12, null, 1);
+        converted = toBinaryArray(DataTypes.INT(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected = toBinaryArray(DataTypes.BIGINT(), 15L, 12L, null, 1L);
+        converted = toBinaryArray(DataTypes.BIGINT(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected = toBinaryArray(DataTypes.FLOAT(), 15f, 12f, null, 1f);
+        converted = toBinaryArray(DataTypes.FLOAT(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected = toBinaryArray(DataTypes.DOUBLE(), 15d, 12d, null, 1d);
+        converted = toBinaryArray(DataTypes.DOUBLE(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected =
+                toBinaryArray(
+                        DataTypes.STRING(),
+                        BinaryString.fromString("111"),
+                        BinaryString.fromString("112231asdfasdf"),
+                        null,
+                        BinaryString.fromString("14611adfadsfaf"));
+        converted = toBinaryArray(DataTypes.STRING(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected =
+                toBinaryArray(
+                        DataTypes.DECIMAL(25, 2),
+                        Decimal.fromBigDecimal(BigDecimal.valueOf(12), 25, 2),
+                        Decimal.fromBigDecimal(BigDecimal.valueOf(123), 25, 2),
+                        null,
+                        Decimal.fromBigDecimal(BigDecimal.valueOf(1243), 25, 
2));
+        converted =
+                toBinaryArray(DataTypes.DECIMAL(25, 2), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected =
+                toBinaryArray(
+                        DataTypes.DECIMAL(15, 2),
+                        Decimal.fromUnscaledLong(12, 15, 2),
+                        Decimal.fromUnscaledLong(123, 15, 2),
+                        null,
+                        Decimal.fromUnscaledLong(1243, 15, 2));
+        converted =
+                toBinaryArray(DataTypes.DECIMAL(15, 2), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected =
+                toBinaryArray(
+                        DataTypes.TIMESTAMP(3),
+                        Timestamp.fromEpochMillis(1244444),
+                        Timestamp.fromEpochMillis(12444244),
+                        null,
+                        Timestamp.fromEpochMillis(12445444));
+        converted =
+                toBinaryArray(DataTypes.TIMESTAMP(3), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected =
+                toBinaryArray(
+                        DataTypes.TIMESTAMP(6),
+                        Timestamp.fromMicros(1244444),
+                        Timestamp.fromMicros(12444244),
+                        null,
+                        Timestamp.fromMicros(12445444));
+        converted =
+                toBinaryArray(DataTypes.TIMESTAMP(6), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+
+        expected =
+                toBinaryArray(
+                        DataTypes.BYTES(),
+                        BinaryString.fromString("111").toBytes(),
+                        BinaryString.fromString("112231asdfasdf").toBytes(),
+                        null,
+                        BinaryString.fromString("14611asdfadsaf").toBytes());
+        converted = toBinaryArray(DataTypes.BYTES(), new 
SafeBinaryArray(expected.toBytes(), 0));
+        assertThat(converted).isEqualTo(expected);
+    }
+
+    private BinaryArray toBinaryArray(DataType eleType, Object... values) {
+        return toBinaryArray(eleType, new GenericArray(values));
+    }
+
+    private BinaryArray toBinaryArray(DataType eleType, InternalArray array) {
+        return new InternalArraySerializer(eleType).toBinaryArray(array);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index 15a8df420..dc6d86c7f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -20,7 +20,9 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.safe.SafeBinaryRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
 import org.apache.paimon.types.ArrayType;
@@ -29,8 +31,9 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.ObjectSerializer;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,13 +44,15 @@ import static 
org.apache.paimon.utils.SerializationUtils.newBytesType;
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
-/** Serializer for {@link DataFileMeta}. */
-public class DataFileMeta08Serializer extends ObjectSerializer<DataFileMeta> {
+/** Serializer for {@link DataFileMeta} with safe deserializer. */
+public class DataFileMeta08Serializer implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    protected final InternalRowSerializer rowSerializer;
+
     public DataFileMeta08Serializer() {
-        super(schemaFor08());
+        this.rowSerializer = InternalSerializers.create(schemaFor08());
     }
 
     private static RowType schemaFor08() {
@@ -70,28 +75,48 @@ public class DataFileMeta08Serializer extends 
ObjectSerializer<DataFileMeta> {
         return new RowType(fields);
     }
 
-    @Override
-    public InternalRow toRow(DataFileMeta meta) {
-        return GenericRow.of(
-                BinaryString.fromString(meta.fileName()),
-                meta.fileSize(),
-                meta.rowCount(),
-                serializeBinaryRow(meta.minKey()),
-                serializeBinaryRow(meta.maxKey()),
-                meta.keyStats().toRow(),
-                meta.valueStats().toRow(),
-                meta.minSequenceNumber(),
-                meta.maxSequenceNumber(),
-                meta.schemaId(),
-                meta.level(),
-                toStringArrayData(meta.extraFiles()),
-                meta.creationTime(),
-                meta.deleteRowCount().orElse(null),
-                meta.embeddedIndex());
+    public final void serializeList(List<DataFileMeta> records, DataOutputView 
target)
+            throws IOException {
+        target.writeInt(records.size());
+        for (DataFileMeta t : records) {
+            serialize(t, target);
+        }
+    }
+
+    private void serialize(DataFileMeta meta, DataOutputView target) throws 
IOException {
+        GenericRow row =
+                GenericRow.of(
+                        BinaryString.fromString(meta.fileName()),
+                        meta.fileSize(),
+                        meta.rowCount(),
+                        serializeBinaryRow(meta.minKey()),
+                        serializeBinaryRow(meta.maxKey()),
+                        meta.keyStats().toRow(),
+                        meta.valueStats().toRow(),
+                        meta.minSequenceNumber(),
+                        meta.maxSequenceNumber(),
+                        meta.schemaId(),
+                        meta.level(),
+                        toStringArrayData(meta.extraFiles()),
+                        meta.creationTime(),
+                        meta.deleteRowCount().orElse(null),
+                        meta.embeddedIndex());
+        rowSerializer.serialize(row, target);
+    }
+
+    public final List<DataFileMeta> deserializeList(DataInputView source) 
throws IOException {
+        int size = source.readInt();
+        List<DataFileMeta> records = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            records.add(deserialize(source));
+        }
+        return records;
     }
 
-    @Override
-    public DataFileMeta fromRow(InternalRow row) {
+    private DataFileMeta deserialize(DataInputView in) throws IOException {
+        byte[] bytes = new byte[in.readInt()];
+        in.readFully(bytes);
+        SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 
0);
         return new DataFileMeta(
                 row.getString(0).toString(),
                 row.getLong(1),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
index c73c12ffa..c0ae59017 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.serializer.VersionedSerializer;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageLegacyV2Serializer;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 
 import java.io.ByteArrayOutputStream;
@@ -37,6 +38,8 @@ public class ManifestCommittableSerializer implements 
VersionedSerializer<Manife
 
     private final CommitMessageSerializer commitMessageSerializer;
 
+    private CommitMessageLegacyV2Serializer legacyV2CommitMessageSerializer;
+
     public ManifestCommittableSerializer() {
         this.commitMessageSerializer = new CommitMessageSerializer();
     }
@@ -89,8 +92,30 @@ public class ManifestCommittableSerializer implements 
VersionedSerializer<Manife
         Long watermark = view.readBoolean() ? null : view.readLong();
         Map<Integer, Long> offsets = deserializeOffsets(view);
         int fileCommittableSerializerVersion = view.readInt();
-        List<CommitMessage> fileCommittables =
-                
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
+        List<CommitMessage> fileCommittables;
+        try {
+            fileCommittables =
+                    
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
+        } catch (Exception e) {
+            if (fileCommittableSerializerVersion != 2) {
+                throw e;
+            }
+
+            // rebuild view
+            view = new DataInputDeserializer(serialized);
+            view.readLong();
+            if (!view.readBoolean()) {
+                view.readLong();
+            }
+            deserializeOffsets(view);
+            view.readInt();
+
+            if (legacyV2CommitMessageSerializer == null) {
+                legacyV2CommitMessageSerializer = new 
CommitMessageLegacyV2Serializer();
+            }
+            fileCommittables = 
legacyV2CommitMessageSerializer.deserializeList(view);
+        }
+
         return new ManifestCommittable(identifier, watermark, offsets, 
fileCommittables);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
new file mode 100644
index 000000000..1c6564b2e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** A legacy version serializer for {@link CommitMessage}. */
+public class CommitMessageLegacyV2Serializer {
+
+    private DataFileMetaLegacyV2Serializer dataFileSerializer;
+    private IndexFileMetaLegacyV2Serializer indexEntrySerializer;
+
+    public List<CommitMessage> deserializeList(DataInputView view) throws 
IOException {
+        int length = view.readInt();
+        List<CommitMessage> list = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            list.add(deserialize(view));
+        }
+        return list;
+    }
+
+    public CommitMessage deserialize(DataInputView view) throws IOException {
+        if (dataFileSerializer == null) {
+            dataFileSerializer = new DataFileMetaLegacyV2Serializer();
+            indexEntrySerializer = new IndexFileMetaLegacyV2Serializer();
+        }
+        return new CommitMessageImpl(
+                deserializeBinaryRow(view),
+                view.readInt(),
+                new DataIncrement(
+                        dataFileSerializer.deserializeList(view),
+                        Collections.emptyList(),
+                        dataFileSerializer.deserializeList(view)),
+                new CompactIncrement(
+                        dataFileSerializer.deserializeList(view),
+                        dataFileSerializer.deserializeList(view),
+                        dataFileSerializer.deserializeList(view)),
+                new 
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+    }
+
+    private static RowType legacyDataFileSchema() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
+        fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
+        fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
+        fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
+        fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
+        fields.add(new DataField(5, "_KEY_STATS", 
SimpleStatsConverter.schema()));
+        fields.add(new DataField(6, "_VALUE_STATS", 
SimpleStatsConverter.schema()));
+        fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new 
BigIntType(false)));
+        fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new 
BigIntType(false)));
+        fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
+        fields.add(new DataField(10, "_LEVEL", new IntType(false)));
+        fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, 
newStringType(false))));
+        fields.add(new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()));
+        return new RowType(fields);
+    }
+
+    private static RowType legacyIndexFileSchema() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false)));
+        fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
+        fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
+        fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+        return new RowType(fields);
+    }
+
+    /** A legacy version serializer for {@link DataFileMeta}. */
+    private static class DataFileMetaLegacyV2Serializer extends 
ObjectSerializer<DataFileMeta> {
+
+        private static final long serialVersionUID = 1L;
+
+        public DataFileMetaLegacyV2Serializer() {
+            super(legacyDataFileSchema());
+        }
+
+        @Override
+        public InternalRow toRow(DataFileMeta meta) {
+            return GenericRow.of(
+                    BinaryString.fromString(meta.fileName()),
+                    meta.fileSize(),
+                    meta.rowCount(),
+                    serializeBinaryRow(meta.minKey()),
+                    serializeBinaryRow(meta.maxKey()),
+                    meta.keyStats().toRow(),
+                    meta.valueStats().toRow(),
+                    meta.minSequenceNumber(),
+                    meta.maxSequenceNumber(),
+                    meta.schemaId(),
+                    meta.level(),
+                    toStringArrayData(meta.extraFiles()),
+                    meta.creationTime());
+        }
+
+        @Override
+        public DataFileMeta fromRow(InternalRow row) {
+            return new DataFileMeta(
+                    row.getString(0).toString(),
+                    row.getLong(1),
+                    row.getLong(2),
+                    deserializeBinaryRow(row.getBinary(3)),
+                    deserializeBinaryRow(row.getBinary(4)),
+                    SimpleStats.fromRow(row.getRow(5, 3)),
+                    SimpleStats.fromRow(row.getRow(6, 3)),
+                    row.getLong(7),
+                    row.getLong(8),
+                    row.getLong(9),
+                    row.getInt(10),
+                    fromStringArrayData(row.getArray(11)),
+                    row.getTimestamp(12, 3),
+                    null,
+                    null,
+                    null);
+        }
+    }
+
+    /** A legacy version serializer for {@link IndexFileMeta}. */
+    private static class IndexFileMetaLegacyV2Serializer extends 
ObjectSerializer<IndexFileMeta> {
+
+        public IndexFileMetaLegacyV2Serializer() {
+            super(legacyIndexFileSchema());
+        }
+
+        @Override
+        public InternalRow toRow(IndexFileMeta record) {
+            return GenericRow.of(
+                    BinaryString.fromString(record.indexType()),
+                    BinaryString.fromString(record.fileName()),
+                    record.fileSize(),
+                    record.rowCount());
+        }
+
+        @Override
+        public IndexFileMeta fromRow(InternalRow row) {
+            return new IndexFileMeta(
+                    row.getString(0).toString(),
+                    row.getString(1).toString(),
+                    row.getLong(2),
+                    row.getLong(3),
+                    null);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 53a1f9455..cc8abfa48 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.data.serializer.VersionedSerializer;
 import org.apache.paimon.index.IndexFileMetaSerializer;
 import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataIncrement;
@@ -30,7 +29,6 @@ import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.io.IndexIncrement;
-import org.apache.paimon.utils.ObjectSerializer;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -106,14 +104,23 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
     }
 
     private CommitMessage deserialize(int version, DataInputView view) throws 
IOException {
-        ObjectSerializer<DataFileMeta> dataFileSerializer;
         if (version == CURRENT_VERSION) {
-            dataFileSerializer = this.dataFileSerializer;
+            return new CommitMessageImpl(
+                    deserializeBinaryRow(view),
+                    view.readInt(),
+                    new DataIncrement(
+                            dataFileSerializer.deserializeList(view),
+                            dataFileSerializer.deserializeList(view),
+                            dataFileSerializer.deserializeList(view)),
+                    new CompactIncrement(
+                            dataFileSerializer.deserializeList(view),
+                            dataFileSerializer.deserializeList(view),
+                            dataFileSerializer.deserializeList(view)),
+                    new IndexIncrement(
+                            indexEntrySerializer.deserializeList(view),
+                            indexEntrySerializer.deserializeList(view)));
         } else if (version <= 2) {
-            if (dataFile08Serializer == null) {
-                dataFile08Serializer = new DataFileMeta08Serializer();
-            }
-            dataFileSerializer = dataFile08Serializer;
+            return deserialize08(version, view);
         } else {
             throw new UnsupportedOperationException(
                     "Expecting CommitMessageSerializer version to be smaller 
or equal than "
@@ -122,17 +129,24 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
                             + version
                             + ".");
         }
+    }
+
+    private CommitMessage deserialize08(int version, DataInputView view) 
throws IOException {
+        if (dataFile08Serializer == null) {
+            dataFile08Serializer = new DataFileMeta08Serializer();
+        }
+
         return new CommitMessageImpl(
                 deserializeBinaryRow(view),
                 view.readInt(),
                 new DataIncrement(
-                        dataFileSerializer.deserializeList(view),
-                        dataFileSerializer.deserializeList(view),
-                        dataFileSerializer.deserializeList(view)),
+                        dataFile08Serializer.deserializeList(view),
+                        dataFile08Serializer.deserializeList(view),
+                        dataFile08Serializer.deserializeList(view)),
                 new CompactIncrement(
-                        dataFileSerializer.deserializeList(view),
-                        dataFileSerializer.deserializeList(view),
-                        dataFileSerializer.deserializeList(view)),
+                        dataFile08Serializer.deserializeList(view),
+                        dataFile08Serializer.deserializeList(view),
+                        dataFile08Serializer.deserializeList(view)),
                 new IndexIncrement(
                         indexEntrySerializer.deserializeList(view),
                         version <= 2
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 13b9d1184..744904c71 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -173,4 +173,70 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
         deserialized = serializer.deserialize(2, v2Bytes);
         assertThat(deserialized).isEqualTo(manifestCommittable);
     }
+
+    @Test
+    public void testCompatibilityToVersion2PaimonV07() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        null,
+                        null,
+                        null);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, null);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        new DataIncrement(dataFiles, Collections.emptyList(), 
dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = serializer.deserialize(2, bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v2-0.7"),
+                        true);
+        deserialized = serializer.deserialize(2, v2Bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
 }
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
new file mode 100644
index 000000000..5e1fb2f44
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7 
differ

Reply via email to