This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-0.8 by this push:
new 269c3c05c [core] Make ManifestCommittableSerializer be compatible with
Paimon 0.7 (#3476)
269c3c05c is described below
commit 269c3c05caeb7667ba1ef55112eebc351f75868c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 7 17:17:09 2024 +0800
[core] Make ManifestCommittableSerializer be compatible with Paimon 0.7
(#3476)
---
.../apache/paimon/data/safe/SafeBinaryArray.java | 208 +++++++++++++++++++++
.../org/apache/paimon/data/safe/SafeBinaryRow.java | 190 +++++++++++++++++++
.../java/org/apache/paimon/memory/BytesUtils.java | 70 +++++++
.../apache/paimon/memory/MemorySegmentUtils.java | 4 +-
.../main/java/org/apache/paimon/utils/IOUtils.java | 8 +
.../java/org/apache/paimon/data/RowDataTest.java | 44 +++++
.../apache/paimon/data/SafeBinaryArrayTest.java | 137 ++++++++++++++
.../manifest/ManifestCommittableSerializer.java | 29 ++-
.../sink/CommitMessageLegacyV2Serializer.java | 187 ++++++++++++++++++
.../paimon/table/sink/CommitMessageSerializer.java | 5 +-
.../table/sink/DataFileMetaSafeSerializer.java | 111 +++++++++++
...festCommittableSerializerCompatibilityTest.java | 66 +++++++
.../compatibility/manifest-committable-v2-0.7 | Bin 0 -> 2245 bytes
13 files changed, 1052 insertions(+), 7 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
new file mode 100644
index 000000000..4170d3914
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.safe;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.BytesUtils;
+
+import static org.apache.paimon.data.BinaryArray.calculateHeaderInBytes;
+import static org.apache.paimon.memory.MemorySegmentUtils.BIT_BYTE_INDEX_MASK;
+import static org.apache.paimon.memory.MemorySegmentUtils.byteIndex;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link BinaryRow} which is safe avoid core dump. */
+public final class SafeBinaryArray implements InternalArray {
+
+ private final int size;
+ private final byte[] bytes;
+ private final int offset;
+ private final int elementOffset;
+
+ public SafeBinaryArray(byte[] bytes, int offset) {
+ checkArgument(bytes.length > offset + 4);
+ final int size = BytesUtils.getInt(bytes, offset);
+ assert size >= 0 : "size (" + size + ") should >= 0";
+
+ this.size = size;
+ this.bytes = bytes;
+ this.offset = offset;
+ this.elementOffset = offset + calculateHeaderInBytes(this.size);
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ private int getElementOffset(int ordinal, int elementSize) {
+ return elementOffset + ordinal * elementSize;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ byte current = bytes[offset + 4 + byteIndex(pos)];
+ return (current & (1 << (pos & BIT_BYTE_INDEX_MASK))) != 0;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return bytes[getElementOffset(pos, 1)] != 0;
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return bytes[getElementOffset(pos, 1)];
+ }
+
+ @Override
+ public short getShort(int pos) {
+ int fieldOffset = getElementOffset(pos, 2);
+ checkArgument(bytes.length >= fieldOffset + 2);
+ return BytesUtils.getShort(bytes, fieldOffset);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ int fieldOffset = getElementOffset(pos, 4);
+ checkArgument(bytes.length >= fieldOffset + 4);
+ return BytesUtils.getInt(bytes, fieldOffset);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ int fieldOffset = getElementOffset(pos, 8);
+ checkArgument(bytes.length >= fieldOffset + 8);
+ return BytesUtils.getLong(bytes, fieldOffset);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return Float.intBitsToFloat(getInt(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return Double.longBitsToDouble(getLong(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return BinaryString.fromBytes(getBinary(pos));
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ long longValue = getLong(pos);
+ if (Decimal.isCompact(precision)) {
+ return Decimal.fromUnscaledLong(longValue, precision, scale);
+ }
+
+ final int size = ((int) longValue);
+ int subOffset = (int) (longValue >> 32);
+ byte[] decimalBytes = new byte[size];
+ System.arraycopy(bytes, offset + subOffset, decimalBytes, 0, size);
+ return Decimal.fromUnscaledBytes(decimalBytes, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ long longValue = getLong(pos);
+ if (Timestamp.isCompact(precision)) {
+ return Timestamp.fromEpochMillis(longValue);
+ }
+
+ final int nanoOfMillisecond = (int) longValue;
+ final int subOffset = (int) (longValue >> 32);
+
+ checkArgument(bytes.length >= offset + subOffset + 8);
+ final long millisecond = BytesUtils.getLong(bytes, offset + subOffset);
+ return Timestamp.fromEpochMillis(millisecond, nanoOfMillisecond);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return BytesUtils.readBinary(bytes, offset, getElementOffset(pos, 8),
getLong(pos));
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public short[] toShortArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int[] toIntArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long[] toLongArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
new file mode 100644
index 000000000..ce6b3ae62
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.safe;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.BytesUtils;
+import org.apache.paimon.types.RowKind;
+
+import static org.apache.paimon.data.BinaryRow.HEADER_SIZE_IN_BITS;
+import static org.apache.paimon.data.BinaryRow.calculateBitSetWidthInBytes;
+import static org.apache.paimon.memory.MemorySegmentUtils.BIT_BYTE_INDEX_MASK;
+import static org.apache.paimon.memory.MemorySegmentUtils.byteIndex;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link BinaryRow} which is safe avoid core dump. */
+public final class SafeBinaryRow implements InternalRow {
+
+ private final int arity;
+ private final int nullBitsSizeInBytes;
+ private final byte[] bytes;
+ private final int offset;
+
+ public SafeBinaryRow(int arity, byte[] bytes, int offset) {
+ checkArgument(arity >= 0);
+ this.arity = arity;
+ this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+ this.bytes = bytes;
+ this.offset = offset;
+ }
+
+ private int getFieldOffset(int pos) {
+ return offset + nullBitsSizeInBytes + pos * 8;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return arity;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ byte kindValue = bytes[offset];
+ return RowKind.fromByteValue(kindValue);
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ bytes[offset] = kind.toByteValue();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ int index = pos + HEADER_SIZE_IN_BITS;
+ int offset = this.offset + byteIndex(index);
+ byte current = bytes[offset];
+ return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return bytes[getFieldOffset(pos)] != 0;
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return bytes[getFieldOffset(pos)];
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return BytesUtils.getShort(bytes, getFieldOffset(pos));
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return BytesUtils.getInt(bytes, getFieldOffset(pos));
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return BytesUtils.getLong(bytes, getFieldOffset(pos));
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return Float.intBitsToFloat(getInt(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return Double.longBitsToDouble(getLong(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return BinaryString.fromBytes(getBinary(pos));
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ long longValue = getLong(pos);
+ if (Decimal.isCompact(precision)) {
+ return Decimal.fromUnscaledLong(longValue, precision, scale);
+ }
+
+ final int size = ((int) longValue);
+ int subOffset = (int) (longValue >> 32);
+ byte[] decimalBytes = new byte[size];
+ System.arraycopy(bytes, offset + subOffset, decimalBytes, 0, size);
+ return Decimal.fromUnscaledBytes(decimalBytes, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ long longValue = getLong(pos);
+ if (Timestamp.isCompact(precision)) {
+ return Timestamp.fromEpochMillis(longValue);
+ }
+
+ final int nanoOfMillisecond = (int) longValue;
+ final int subOffset = (int) (longValue >> 32);
+
+ checkArgument(bytes.length >= offset + subOffset + 8);
+ final long millisecond = BytesUtils.getLong(bytes, offset + subOffset);
+ return Timestamp.fromEpochMillis(millisecond, nanoOfMillisecond);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return BytesUtils.readBinary(bytes, offset, getFieldOffset(pos),
getLong(pos));
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return readArrayData(bytes, offset, getLong(pos));
+ }
+
+ private static InternalArray readArrayData(byte[] bytes, int baseOffset,
long offsetAndSize) {
+ int offset = (int) (offsetAndSize >> 32);
+ return new SafeBinaryArray(bytes, offset + baseOffset);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return readNestedRow(bytes, numFields, offset, getLong(pos));
+ }
+
+ private static InternalRow readNestedRow(
+ byte[] bytes, int numFields, int baseOffset, long offsetAndSize) {
+ int offset = (int) (offsetAndSize >> 32);
+ return new SafeBinaryRow(numFields, bytes, offset + baseOffset);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
new file mode 100644
index 000000000..db1717073
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import static org.apache.paimon.data.BinarySection.HIGHEST_FIRST_BIT;
+import static
org.apache.paimon.data.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT;
+import static org.apache.paimon.memory.MemorySegment.LITTLE_ENDIAN;
+
+/** Utils for byte[]. */
+public class BytesUtils {
+
+ public static int getInt(byte[] bytes, int offset) {
+ return (bytes[offset + 3] << 24)
+ | ((bytes[offset + 2] & 0xff) << 16)
+ | ((bytes[offset + 1] & 0xff) << 8)
+ | (bytes[offset] & 0xff);
+ }
+
+ public static short getShort(byte[] bytes, int offset) {
+ return (short) ((bytes[offset + 1] << 8) | (bytes[offset] & 0xff));
+ }
+
+ public static long getLong(byte[] bytes, int offset) {
+ return ((long) bytes[offset + 7] << 56)
+ | (((long) bytes[offset + 6] & 0xff) << 48)
+ | (((long) bytes[offset + 5] & 0xff) << 40)
+ | (((long) bytes[offset + 4] & 0xff) << 32)
+ | (((long) bytes[offset + 3] & 0xff) << 24)
+ | (((long) bytes[offset + 2] & 0xff) << 16)
+ | (((long) bytes[offset + 1] & 0xff) << 8)
+ | ((long) bytes[offset] & 0xff);
+ }
+
+ public static byte[] readBinary(
+ byte[] bytes, int baseOffset, int fieldOffset, long
variablePartOffsetAndLen) {
+ long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+ if (mark == 0) {
+ final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+ final int len = (int) variablePartOffsetAndLen;
+ byte[] ret = new byte[len];
+ System.arraycopy(bytes, baseOffset + subOffset, ret, 0, len);
+ return ret;
+ } else {
+ int len = (int) ((variablePartOffsetAndLen &
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+ byte[] ret = new byte[len];
+ if (LITTLE_ENDIAN) {
+ System.arraycopy(bytes, fieldOffset, ret, 0, len);
+ } else {
+ System.arraycopy(bytes, fieldOffset + 1, ret, 0, len);
+ }
+ return ret;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
index 47407d196..a021c47ee 100644
---
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
@@ -43,7 +43,7 @@ public class MemorySegmentUtils {
private static final int ADDRESS_BITS_PER_WORD = 3;
- private static final int BIT_BYTE_INDEX_MASK = 7;
+ public static final int BIT_BYTE_INDEX_MASK = 7;
private static final int MAX_BYTES_LENGTH = 1024 * 64;
@@ -455,7 +455,7 @@ public class MemorySegmentUtils {
* @param bitIndex the bit index.
* @return the byte index.
*/
- private static int byteIndex(int bitIndex) {
+ public static int byteIndex(int bitIndex) {
return bitIndex >>> ADDRESS_BITS_PER_WORD;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
index f8a78eb21..cb71f9c87 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -90,6 +91,13 @@ public final class IOUtils {
// Stream input skipping
// ------------------------------------------------------------------------
+ /** Reads all into a bytes. */
+ public static byte[] readFully(InputStream in, boolean close) throws
IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ copyBytes(in, output, BLOCKSIZE, close);
+ return output.toByteArray();
+ }
+
/**
* Reads len bytes in a loop.
*
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
index 757b24d88..2998eade6 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.data;
+import org.apache.paimon.data.safe.SafeBinaryRow;
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -91,6 +92,49 @@ public class RowDataTest {
testSetters(nestedRow);
}
+ @Test
+ public void testSafeBinaryRow() {
+ BinaryRow binaryRow = getBinaryRow();
+ SafeBinaryRow row = new SafeBinaryRow(binaryRow.getFieldCount(),
binaryRow.toBytes(), 0);
+ assertThat(row.getFieldCount()).isEqualTo(18);
+
+ // test header
+ assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT);
+ row.setRowKind(RowKind.DELETE);
+ assertThat(row.getRowKind()).isEqualTo(RowKind.DELETE);
+
+ // test get
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.getBoolean(0)).isTrue();
+ assertThat(row.getByte(1)).isEqualTo((byte) 1);
+ assertThat(row.getShort(2)).isEqualTo((short) 2);
+ assertThat(row.getInt(3)).isEqualTo(3);
+ assertThat(row.getLong(4)).isEqualTo(4L);
+ assertThat(row.getFloat(5)).isEqualTo(5f);
+ assertThat(row.getDouble(6)).isEqualTo(6d);
+ assertThat(row.getString(8)).isEqualTo(str);
+ assertThat(row.getString(9)).isEqualTo(str);
+ assertThat(row.getDecimal(10, 5, 0)).isEqualTo(decimal1);
+ assertThat(row.getDecimal(11, 20, 0)).isEqualTo(decimal2);
+ assertThat(row.getArray(12).size()).isEqualTo(2);
+ assertThat(row.getArray(12).isNullAt(0)).isFalse();
+ assertThat(row.getArray(12).getInt(0)).isEqualTo(15);
+ assertThat(row.getArray(12).isNullAt(1)).isFalse();
+ assertThat(row.getArray(12).getInt(1)).isEqualTo(16);
+ // TODO support map
+ // assertThat(row.getMap(13)).isEqualTo(map);
+ assertThat(row.getRow(14, 2).getInt(0)).isEqualTo(15);
+ assertThat(row.getRow(14, 2).getInt(1)).isEqualTo(16);
+ assertThat(row.getBinary(15)).isEqualTo(bytes);
+ assertThat(row.getTimestamp(16, 3)).isEqualTo(timestamp1);
+ assertThat(row.getTimestamp(17, 9)).isEqualTo(timestamp2);
+
+ // test isNull
+ binaryRow.setNullAt(10);
+ row = new SafeBinaryRow(binaryRow.getFieldCount(),
binaryRow.toBytes(), 0);
+ assertThat(row.isNullAt(10)).isTrue();
+ }
+
private BinaryRow getBinaryRow() {
BinaryRow row = new BinaryRow(18);
BinaryRowWriter writer = new BinaryRowWriter(row);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
new file mode 100644
index 000000000..08490f18c
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.data.safe.SafeBinaryArray;
+import org.apache.paimon.data.serializer.InternalArraySerializer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SafeBinaryArrayTest {
+
+ @Test
+ public void test() {
+ BinaryArray expected = toBinaryArray(DataTypes.BOOLEAN(), true, false,
null, true);
+ BinaryArray converted =
+ toBinaryArray(DataTypes.BOOLEAN(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.TINYINT(), (byte) 15, (byte) 12,
null, (byte) 1);
+ converted = toBinaryArray(DataTypes.TINYINT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.SMALLINT(), (short) 15, (short) 12,
null, (short) 1);
+ converted = toBinaryArray(DataTypes.SMALLINT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.INT(), 15, 12, null, 1);
+ converted = toBinaryArray(DataTypes.INT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.BIGINT(), 15L, 12L, null, 1L);
+ converted = toBinaryArray(DataTypes.BIGINT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.FLOAT(), 15f, 12f, null, 1f);
+ converted = toBinaryArray(DataTypes.FLOAT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.DOUBLE(), 15d, 12d, null, 1d);
+ converted = toBinaryArray(DataTypes.DOUBLE(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.STRING(),
+ BinaryString.fromString("111"),
+ BinaryString.fromString("112231asdfasdf"),
+ null,
+ BinaryString.fromString("14611adfadsfaf"));
+ converted = toBinaryArray(DataTypes.STRING(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.DECIMAL(25, 2),
+ Decimal.fromBigDecimal(BigDecimal.valueOf(12), 25, 2),
+ Decimal.fromBigDecimal(BigDecimal.valueOf(123), 25, 2),
+ null,
+ Decimal.fromBigDecimal(BigDecimal.valueOf(1243), 25,
2));
+ converted =
+ toBinaryArray(DataTypes.DECIMAL(25, 2), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.DECIMAL(15, 2),
+ Decimal.fromUnscaledLong(12, 15, 2),
+ Decimal.fromUnscaledLong(123, 15, 2),
+ null,
+ Decimal.fromUnscaledLong(1243, 15, 2));
+ converted =
+ toBinaryArray(DataTypes.DECIMAL(15, 2), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.TIMESTAMP(3),
+ Timestamp.fromEpochMillis(1244444),
+ Timestamp.fromEpochMillis(12444244),
+ null,
+ Timestamp.fromEpochMillis(12445444));
+ converted =
+ toBinaryArray(DataTypes.TIMESTAMP(3), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.TIMESTAMP(6),
+ Timestamp.fromMicros(1244444),
+ Timestamp.fromMicros(12444244),
+ null,
+ Timestamp.fromMicros(12445444));
+ converted =
+ toBinaryArray(DataTypes.TIMESTAMP(6), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.BYTES(),
+ BinaryString.fromString("111").toBytes(),
+ BinaryString.fromString("112231asdfasdf").toBytes(),
+ null,
+ BinaryString.fromString("14611asdfadsaf").toBytes());
+ converted = toBinaryArray(DataTypes.BYTES(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+ }
+
+ private BinaryArray toBinaryArray(DataType eleType, Object... values) {
+ return toBinaryArray(eleType, new GenericArray(values));
+ }
+
+ private BinaryArray toBinaryArray(DataType eleType, InternalArray array) {
+ return new InternalArraySerializer(eleType).toBinaryArray(array);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
index 3ae51b7aa..2c841a2c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageLegacyV2Serializer;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import java.io.ByteArrayOutputStream;
@@ -37,6 +38,8 @@ public class ManifestCommittableSerializer implements
VersionedSerializer<Manife
private final CommitMessageSerializer commitMessageSerializer;
+ private CommitMessageLegacyV2Serializer legacyV2CommitMessageSerializer;
+
public ManifestCommittableSerializer() {
this.commitMessageSerializer = new CommitMessageSerializer();
}
@@ -90,8 +93,30 @@ public class ManifestCommittableSerializer implements
VersionedSerializer<Manife
Long watermark = view.readBoolean() ? null : view.readLong();
Map<Integer, Long> offsets = deserializeOffsets(view);
int fileCommittableSerializerVersion = view.readInt();
- List<CommitMessage> fileCommittables =
-
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
+ List<CommitMessage> fileCommittables;
+ try {
+ fileCommittables =
+
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
+ } catch (Exception e) {
+ if (fileCommittableSerializerVersion != 2) {
+ throw e;
+ }
+
+ // rebuild view
+ view = new DataInputDeserializer(serialized);
+ view.readLong();
+ if (!view.readBoolean()) {
+ view.readLong();
+ }
+ deserializeOffsets(view);
+ view.readInt();
+
+ if (legacyV2CommitMessageSerializer == null) {
+ legacyV2CommitMessageSerializer = new
CommitMessageLegacyV2Serializer();
+ }
+ fileCommittables =
legacyV2CommitMessageSerializer.deserializeList(view);
+ }
+
return new ManifestCommittable(identifier, watermark, offsets,
fileCommittables);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
new file mode 100644
index 000000000..57cfe04fa
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.stats.BinaryTableStats;
+import org.apache.paimon.stats.FieldStatsArraySerializer;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** A legacy version serializer for {@link CommitMessage}. */
+public class CommitMessageLegacyV2Serializer {
+
+ private DataFileMetaLegacyV2Serializer dataFileSerializer;
+ private IndexFileMetaLegacyV2Serializer indexEntrySerializer;
+
+ public List<CommitMessage> deserializeList(DataInputView view) throws
IOException {
+ int length = view.readInt();
+ List<CommitMessage> list = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ list.add(deserialize(view));
+ }
+ return list;
+ }
+
+ public CommitMessage deserialize(DataInputView view) throws IOException {
+ if (dataFileSerializer == null) {
+ dataFileSerializer = new DataFileMetaLegacyV2Serializer();
+ indexEntrySerializer = new IndexFileMetaLegacyV2Serializer();
+ }
+ return new CommitMessageImpl(
+ deserializeBinaryRow(view),
+ view.readInt(),
+ new DataIncrement(
+ dataFileSerializer.deserializeList(view),
+ Collections.emptyList(),
+ dataFileSerializer.deserializeList(view)),
+ new CompactIncrement(
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view)),
+ new
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+ }
+
+ private static RowType legacyDataFileSchema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
+ fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
+ fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
+ fields.add(new DataField(5, "_KEY_STATS",
FieldStatsArraySerializer.schema()));
+ fields.add(new DataField(6, "_VALUE_STATS",
FieldStatsArraySerializer.schema()));
+ fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
+ fields.add(new DataField(10, "_LEVEL", new IntType(false)));
+ fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))));
+ fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
+ return new RowType(fields);
+ }
+
+ private static RowType legacyIndexFileSchema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false)));
+ fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+ return new RowType(fields);
+ }
+
+ /** A legacy version serializer for {@link DataFileMeta}. */
+ private static class DataFileMetaLegacyV2Serializer extends
ObjectSerializer<DataFileMeta> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DataFileMetaLegacyV2Serializer() {
+ super(legacyDataFileSchema());
+ }
+
+ @Override
+ public InternalRow toRow(DataFileMeta meta) {
+ return GenericRow.of(
+ BinaryString.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ serializeBinaryRow(meta.minKey()),
+ serializeBinaryRow(meta.maxKey()),
+ meta.keyStats().toRow(),
+ meta.valueStats().toRow(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ toStringArrayData(meta.extraFiles()),
+ meta.creationTime());
+ }
+
+ @Override
+ public DataFileMeta fromRow(InternalRow row) {
+ return new DataFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getLong(2),
+ deserializeBinaryRow(row.getBinary(3)),
+ deserializeBinaryRow(row.getBinary(4)),
+ BinaryTableStats.fromRow(row.getRow(5, 3)),
+ BinaryTableStats.fromRow(row.getRow(6, 3)),
+ row.getLong(7),
+ row.getLong(8),
+ row.getLong(9),
+ row.getInt(10),
+ fromStringArrayData(row.getArray(11)),
+ row.getTimestamp(12, 3),
+ null,
+ null);
+ }
+ }
+
+ /** A legacy version serializer for {@link IndexFileMeta}. */
+ private static class IndexFileMetaLegacyV2Serializer extends
ObjectSerializer<IndexFileMeta> {
+
+ public IndexFileMetaLegacyV2Serializer() {
+ super(legacyIndexFileSchema());
+ }
+
+ @Override
+ public InternalRow toRow(IndexFileMeta record) {
+ return GenericRow.of(
+ BinaryString.fromString(record.indexType()),
+ BinaryString.fromString(record.fileName()),
+ record.fileSize(),
+ record.rowCount());
+ }
+
+ @Override
+ public IndexFileMeta fromRow(InternalRow row) {
+ return new IndexFileMeta(
+ row.getString(0).toString(),
+ row.getString(1).toString(),
+ row.getLong(2),
+ row.getLong(3),
+ null);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 24c9fc892..773cfaaa8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
@@ -42,11 +41,11 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
private static final int CURRENT_VERSION = 2;
- private final DataFileMetaSerializer dataFileSerializer;
+ private final DataFileMetaSafeSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
public CommitMessageSerializer() {
- this.dataFileSerializer = new DataFileMetaSerializer();
+ this.dataFileSerializer = new DataFileMetaSafeSerializer();
this.indexEntrySerializer = new IndexFileMetaSerializer();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DataFileMetaSafeSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DataFileMetaSafeSerializer.java
new file mode 100644
index 000000000..1934e8add
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DataFileMetaSafeSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.safe.SafeBinaryRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.stats.BinaryTableStats;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Serializer for {@link DataFileMeta} with safe deserializer. */
+public class DataFileMetaSafeSerializer implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final InternalRowSerializer rowSerializer;
+
+ public DataFileMetaSafeSerializer() {
+ this.rowSerializer = InternalSerializers.create(DataFileMeta.schema());
+ }
+
+ public final void serializeList(List<DataFileMeta> records, DataOutputView
target)
+ throws IOException {
+ target.writeInt(records.size());
+ for (DataFileMeta t : records) {
+ serialize(t, target);
+ }
+ }
+
+ private void serialize(DataFileMeta meta, DataOutputView target) throws
IOException {
+ GenericRow row =
+ GenericRow.of(
+ BinaryString.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ serializeBinaryRow(meta.minKey()),
+ serializeBinaryRow(meta.maxKey()),
+ meta.keyStats().toRow(),
+ meta.valueStats().toRow(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ toStringArrayData(meta.extraFiles()),
+ meta.creationTime(),
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex());
+ rowSerializer.serialize(row, target);
+ }
+
+ public final List<DataFileMeta> deserializeList(DataInputView source)
throws IOException {
+ int size = source.readInt();
+ List<DataFileMeta> records = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ records.add(deserialize(source));
+ }
+ return records;
+ }
+
+ private DataFileMeta deserialize(DataInputView in) throws IOException {
+ byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+ SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
+ return new DataFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getLong(2),
+ deserializeBinaryRow(row.getBinary(3)),
+ deserializeBinaryRow(row.getBinary(4)),
+ BinaryTableStats.fromRow(row.getRow(5, 3)),
+ BinaryTableStats.fromRow(row.getRow(6, 3)),
+ row.getLong(7),
+ row.getLong(8),
+ row.getLong(9),
+ row.getInt(10),
+ fromStringArrayData(row.getArray(11)),
+ row.getTimestamp(12, 3),
+ row.isNullAt(13) ? null : row.getLong(13),
+ row.isNullAt(14) ? null : row.getBinary(14));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index bfd90b5c3..fcb325ce6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
@@ -102,4 +103,69 @@ public class
ManifestCommittableSerializerCompatibilityTest {
ManifestCommittable deserialized = serializer.deserialize(2, bytes);
assertThat(deserialized).isEqualTo(manifestCommittable);
}
+
+ @Test
+ public void testCompatibilityToVersion2PaimonV07() throws IOException {
+ BinaryTableStats keyStats =
+ new BinaryTableStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ BinaryTableStats valueStats =
+ new BinaryTableStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ null,
+ null);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, null);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ new DataIncrement(dataFiles, Collections.emptyList(),
dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(2, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v2-0.7"),
+ true);
+ deserialized = serializer.deserialize(2, v2Bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
}
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
new file mode 100644
index 000000000..5e1fb2f44
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
differ