This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d328d6f2 [core] Make ManifestCommittableSerializer be compatible with
Paimon 0.7 (#3476)
1d328d6f2 is described below
commit 1d328d6f28c7db1fae28210b03d88f59bb4c32e4
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 7 17:17:09 2024 +0800
[core] Make ManifestCommittableSerializer be compatible with Paimon 0.7
(#3476)
---
.../apache/paimon/data/safe/SafeBinaryArray.java | 208 +++++++++++++++++++++
.../org/apache/paimon/data/safe/SafeBinaryRow.java | 190 +++++++++++++++++++
.../java/org/apache/paimon/memory/BytesUtils.java | 70 +++++++
.../apache/paimon/memory/MemorySegmentUtils.java | 4 +-
.../java/org/apache/paimon/data/RowDataTest.java | 44 +++++
.../apache/paimon/data/SafeBinaryArrayTest.java | 137 ++++++++++++++
.../apache/paimon/io/DataFileMeta08Serializer.java | 75 +++++---
.../manifest/ManifestCommittableSerializer.java | 29 ++-
.../sink/CommitMessageLegacyV2Serializer.java | 188 +++++++++++++++++++
.../paimon/table/sink/CommitMessageSerializer.java | 42 +++--
...festCommittableSerializerCompatibilityTest.java | 66 +++++++
.../compatibility/manifest-committable-v2-0.7 | Bin 0 -> 2245 bytes
12 files changed, 1010 insertions(+), 43 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
new file mode 100644
index 000000000..4170d3914
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.safe;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.BytesUtils;
+
+import static org.apache.paimon.data.BinaryArray.calculateHeaderInBytes;
+import static org.apache.paimon.memory.MemorySegmentUtils.BIT_BYTE_INDEX_MASK;
+import static org.apache.paimon.memory.MemorySegmentUtils.byteIndex;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link BinaryRow} which is safe avoid core dump. */
+public final class SafeBinaryArray implements InternalArray {
+
+ private final int size;
+ private final byte[] bytes;
+ private final int offset;
+ private final int elementOffset;
+
+ public SafeBinaryArray(byte[] bytes, int offset) {
+ checkArgument(bytes.length > offset + 4);
+ final int size = BytesUtils.getInt(bytes, offset);
+ assert size >= 0 : "size (" + size + ") should >= 0";
+
+ this.size = size;
+ this.bytes = bytes;
+ this.offset = offset;
+ this.elementOffset = offset + calculateHeaderInBytes(this.size);
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ private int getElementOffset(int ordinal, int elementSize) {
+ return elementOffset + ordinal * elementSize;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ byte current = bytes[offset + 4 + byteIndex(pos)];
+ return (current & (1 << (pos & BIT_BYTE_INDEX_MASK))) != 0;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return bytes[getElementOffset(pos, 1)] != 0;
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return bytes[getElementOffset(pos, 1)];
+ }
+
+ @Override
+ public short getShort(int pos) {
+ int fieldOffset = getElementOffset(pos, 2);
+ checkArgument(bytes.length >= fieldOffset + 2);
+ return BytesUtils.getShort(bytes, fieldOffset);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ int fieldOffset = getElementOffset(pos, 4);
+ checkArgument(bytes.length >= fieldOffset + 4);
+ return BytesUtils.getInt(bytes, fieldOffset);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ int fieldOffset = getElementOffset(pos, 8);
+ checkArgument(bytes.length >= fieldOffset + 8);
+ return BytesUtils.getLong(bytes, fieldOffset);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return Float.intBitsToFloat(getInt(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return Double.longBitsToDouble(getLong(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return BinaryString.fromBytes(getBinary(pos));
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ long longValue = getLong(pos);
+ if (Decimal.isCompact(precision)) {
+ return Decimal.fromUnscaledLong(longValue, precision, scale);
+ }
+
+ final int size = ((int) longValue);
+ int subOffset = (int) (longValue >> 32);
+ byte[] decimalBytes = new byte[size];
+ System.arraycopy(bytes, offset + subOffset, decimalBytes, 0, size);
+ return Decimal.fromUnscaledBytes(decimalBytes, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ long longValue = getLong(pos);
+ if (Timestamp.isCompact(precision)) {
+ return Timestamp.fromEpochMillis(longValue);
+ }
+
+ final int nanoOfMillisecond = (int) longValue;
+ final int subOffset = (int) (longValue >> 32);
+
+ checkArgument(bytes.length >= offset + subOffset + 8);
+ final long millisecond = BytesUtils.getLong(bytes, offset + subOffset);
+ return Timestamp.fromEpochMillis(millisecond, nanoOfMillisecond);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return BytesUtils.readBinary(bytes, offset, getElementOffset(pos, 8),
getLong(pos));
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public short[] toShortArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int[] toIntArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long[] toLongArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
new file mode 100644
index 000000000..ce6b3ae62
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.safe;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.BytesUtils;
+import org.apache.paimon.types.RowKind;
+
+import static org.apache.paimon.data.BinaryRow.HEADER_SIZE_IN_BITS;
+import static org.apache.paimon.data.BinaryRow.calculateBitSetWidthInBytes;
+import static org.apache.paimon.memory.MemorySegmentUtils.BIT_BYTE_INDEX_MASK;
+import static org.apache.paimon.memory.MemorySegmentUtils.byteIndex;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link BinaryRow} which is safe avoid core dump. */
+public final class SafeBinaryRow implements InternalRow {
+
+ private final int arity;
+ private final int nullBitsSizeInBytes;
+ private final byte[] bytes;
+ private final int offset;
+
+ public SafeBinaryRow(int arity, byte[] bytes, int offset) {
+ checkArgument(arity >= 0);
+ this.arity = arity;
+ this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+ this.bytes = bytes;
+ this.offset = offset;
+ }
+
+ private int getFieldOffset(int pos) {
+ return offset + nullBitsSizeInBytes + pos * 8;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return arity;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ byte kindValue = bytes[offset];
+ return RowKind.fromByteValue(kindValue);
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ bytes[offset] = kind.toByteValue();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ int index = pos + HEADER_SIZE_IN_BITS;
+ int offset = this.offset + byteIndex(index);
+ byte current = bytes[offset];
+ return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return bytes[getFieldOffset(pos)] != 0;
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return bytes[getFieldOffset(pos)];
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return BytesUtils.getShort(bytes, getFieldOffset(pos));
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return BytesUtils.getInt(bytes, getFieldOffset(pos));
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return BytesUtils.getLong(bytes, getFieldOffset(pos));
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return Float.intBitsToFloat(getInt(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return Double.longBitsToDouble(getLong(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return BinaryString.fromBytes(getBinary(pos));
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ long longValue = getLong(pos);
+ if (Decimal.isCompact(precision)) {
+ return Decimal.fromUnscaledLong(longValue, precision, scale);
+ }
+
+ final int size = ((int) longValue);
+ int subOffset = (int) (longValue >> 32);
+ byte[] decimalBytes = new byte[size];
+ System.arraycopy(bytes, offset + subOffset, decimalBytes, 0, size);
+ return Decimal.fromUnscaledBytes(decimalBytes, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ long longValue = getLong(pos);
+ if (Timestamp.isCompact(precision)) {
+ return Timestamp.fromEpochMillis(longValue);
+ }
+
+ final int nanoOfMillisecond = (int) longValue;
+ final int subOffset = (int) (longValue >> 32);
+
+ checkArgument(bytes.length >= offset + subOffset + 8);
+ final long millisecond = BytesUtils.getLong(bytes, offset + subOffset);
+ return Timestamp.fromEpochMillis(millisecond, nanoOfMillisecond);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return BytesUtils.readBinary(bytes, offset, getFieldOffset(pos),
getLong(pos));
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return readArrayData(bytes, offset, getLong(pos));
+ }
+
+ private static InternalArray readArrayData(byte[] bytes, int baseOffset,
long offsetAndSize) {
+ int offset = (int) (offsetAndSize >> 32);
+ return new SafeBinaryArray(bytes, offset + baseOffset);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return readNestedRow(bytes, numFields, offset, getLong(pos));
+ }
+
+ private static InternalRow readNestedRow(
+ byte[] bytes, int numFields, int baseOffset, long offsetAndSize) {
+ int offset = (int) (offsetAndSize >> 32);
+ return new SafeBinaryRow(numFields, bytes, offset + baseOffset);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
new file mode 100644
index 000000000..db1717073
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import static org.apache.paimon.data.BinarySection.HIGHEST_FIRST_BIT;
+import static
org.apache.paimon.data.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT;
+import static org.apache.paimon.memory.MemorySegment.LITTLE_ENDIAN;
+
+/** Utils for byte[]. */
+public class BytesUtils {
+
+ public static int getInt(byte[] bytes, int offset) {
+ return (bytes[offset + 3] << 24)
+ | ((bytes[offset + 2] & 0xff) << 16)
+ | ((bytes[offset + 1] & 0xff) << 8)
+ | (bytes[offset] & 0xff);
+ }
+
+ public static short getShort(byte[] bytes, int offset) {
+ return (short) ((bytes[offset + 1] << 8) | (bytes[offset] & 0xff));
+ }
+
+ public static long getLong(byte[] bytes, int offset) {
+ return ((long) bytes[offset + 7] << 56)
+ | (((long) bytes[offset + 6] & 0xff) << 48)
+ | (((long) bytes[offset + 5] & 0xff) << 40)
+ | (((long) bytes[offset + 4] & 0xff) << 32)
+ | (((long) bytes[offset + 3] & 0xff) << 24)
+ | (((long) bytes[offset + 2] & 0xff) << 16)
+ | (((long) bytes[offset + 1] & 0xff) << 8)
+ | ((long) bytes[offset] & 0xff);
+ }
+
+ public static byte[] readBinary(
+ byte[] bytes, int baseOffset, int fieldOffset, long
variablePartOffsetAndLen) {
+ long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+ if (mark == 0) {
+ final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+ final int len = (int) variablePartOffsetAndLen;
+ byte[] ret = new byte[len];
+ System.arraycopy(bytes, baseOffset + subOffset, ret, 0, len);
+ return ret;
+ } else {
+ int len = (int) ((variablePartOffsetAndLen &
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+ byte[] ret = new byte[len];
+ if (LITTLE_ENDIAN) {
+ System.arraycopy(bytes, fieldOffset, ret, 0, len);
+ } else {
+ System.arraycopy(bytes, fieldOffset + 1, ret, 0, len);
+ }
+ return ret;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
index 47407d196..a021c47ee 100644
---
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java
@@ -43,7 +43,7 @@ public class MemorySegmentUtils {
private static final int ADDRESS_BITS_PER_WORD = 3;
- private static final int BIT_BYTE_INDEX_MASK = 7;
+ public static final int BIT_BYTE_INDEX_MASK = 7;
private static final int MAX_BYTES_LENGTH = 1024 * 64;
@@ -455,7 +455,7 @@ public class MemorySegmentUtils {
* @param bitIndex the bit index.
* @return the byte index.
*/
- private static int byteIndex(int bitIndex) {
+ public static int byteIndex(int bitIndex) {
return bitIndex >>> ADDRESS_BITS_PER_WORD;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
index 757b24d88..2998eade6 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.data;
+import org.apache.paimon.data.safe.SafeBinaryRow;
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -91,6 +92,49 @@ public class RowDataTest {
testSetters(nestedRow);
}
+ @Test
+ public void testSafeBinaryRow() {
+ BinaryRow binaryRow = getBinaryRow();
+ SafeBinaryRow row = new SafeBinaryRow(binaryRow.getFieldCount(),
binaryRow.toBytes(), 0);
+ assertThat(row.getFieldCount()).isEqualTo(18);
+
+ // test header
+ assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT);
+ row.setRowKind(RowKind.DELETE);
+ assertThat(row.getRowKind()).isEqualTo(RowKind.DELETE);
+
+ // test get
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.getBoolean(0)).isTrue();
+ assertThat(row.getByte(1)).isEqualTo((byte) 1);
+ assertThat(row.getShort(2)).isEqualTo((short) 2);
+ assertThat(row.getInt(3)).isEqualTo(3);
+ assertThat(row.getLong(4)).isEqualTo(4L);
+ assertThat(row.getFloat(5)).isEqualTo(5f);
+ assertThat(row.getDouble(6)).isEqualTo(6d);
+ assertThat(row.getString(8)).isEqualTo(str);
+ assertThat(row.getString(9)).isEqualTo(str);
+ assertThat(row.getDecimal(10, 5, 0)).isEqualTo(decimal1);
+ assertThat(row.getDecimal(11, 20, 0)).isEqualTo(decimal2);
+ assertThat(row.getArray(12).size()).isEqualTo(2);
+ assertThat(row.getArray(12).isNullAt(0)).isFalse();
+ assertThat(row.getArray(12).getInt(0)).isEqualTo(15);
+ assertThat(row.getArray(12).isNullAt(1)).isFalse();
+ assertThat(row.getArray(12).getInt(1)).isEqualTo(16);
+ // TODO support map
+ // assertThat(row.getMap(13)).isEqualTo(map);
+ assertThat(row.getRow(14, 2).getInt(0)).isEqualTo(15);
+ assertThat(row.getRow(14, 2).getInt(1)).isEqualTo(16);
+ assertThat(row.getBinary(15)).isEqualTo(bytes);
+ assertThat(row.getTimestamp(16, 3)).isEqualTo(timestamp1);
+ assertThat(row.getTimestamp(17, 9)).isEqualTo(timestamp2);
+
+ // test isNull
+ binaryRow.setNullAt(10);
+ row = new SafeBinaryRow(binaryRow.getFieldCount(),
binaryRow.toBytes(), 0);
+ assertThat(row.isNullAt(10)).isTrue();
+ }
+
private BinaryRow getBinaryRow() {
BinaryRow row = new BinaryRow(18);
BinaryRowWriter writer = new BinaryRowWriter(row);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
new file mode 100644
index 000000000..08490f18c
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.data.safe.SafeBinaryArray;
+import org.apache.paimon.data.serializer.InternalArraySerializer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SafeBinaryArrayTest {
+
+ @Test
+ public void test() {
+ BinaryArray expected = toBinaryArray(DataTypes.BOOLEAN(), true, false,
null, true);
+ BinaryArray converted =
+ toBinaryArray(DataTypes.BOOLEAN(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.TINYINT(), (byte) 15, (byte) 12,
null, (byte) 1);
+ converted = toBinaryArray(DataTypes.TINYINT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.SMALLINT(), (short) 15, (short) 12,
null, (short) 1);
+ converted = toBinaryArray(DataTypes.SMALLINT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.INT(), 15, 12, null, 1);
+ converted = toBinaryArray(DataTypes.INT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.BIGINT(), 15L, 12L, null, 1L);
+ converted = toBinaryArray(DataTypes.BIGINT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.FLOAT(), 15f, 12f, null, 1f);
+ converted = toBinaryArray(DataTypes.FLOAT(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected = toBinaryArray(DataTypes.DOUBLE(), 15d, 12d, null, 1d);
+ converted = toBinaryArray(DataTypes.DOUBLE(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.STRING(),
+ BinaryString.fromString("111"),
+ BinaryString.fromString("112231asdfasdf"),
+ null,
+ BinaryString.fromString("14611adfadsfaf"));
+ converted = toBinaryArray(DataTypes.STRING(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.DECIMAL(25, 2),
+ Decimal.fromBigDecimal(BigDecimal.valueOf(12), 25, 2),
+ Decimal.fromBigDecimal(BigDecimal.valueOf(123), 25, 2),
+ null,
+ Decimal.fromBigDecimal(BigDecimal.valueOf(1243), 25,
2));
+ converted =
+ toBinaryArray(DataTypes.DECIMAL(25, 2), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.DECIMAL(15, 2),
+ Decimal.fromUnscaledLong(12, 15, 2),
+ Decimal.fromUnscaledLong(123, 15, 2),
+ null,
+ Decimal.fromUnscaledLong(1243, 15, 2));
+ converted =
+ toBinaryArray(DataTypes.DECIMAL(15, 2), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.TIMESTAMP(3),
+ Timestamp.fromEpochMillis(1244444),
+ Timestamp.fromEpochMillis(12444244),
+ null,
+ Timestamp.fromEpochMillis(12445444));
+ converted =
+ toBinaryArray(DataTypes.TIMESTAMP(3), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.TIMESTAMP(6),
+ Timestamp.fromMicros(1244444),
+ Timestamp.fromMicros(12444244),
+ null,
+ Timestamp.fromMicros(12445444));
+ converted =
+ toBinaryArray(DataTypes.TIMESTAMP(6), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+
+ expected =
+ toBinaryArray(
+ DataTypes.BYTES(),
+ BinaryString.fromString("111").toBytes(),
+ BinaryString.fromString("112231asdfasdf").toBytes(),
+ null,
+ BinaryString.fromString("14611asdfadsaf").toBytes());
+ converted = toBinaryArray(DataTypes.BYTES(), new
SafeBinaryArray(expected.toBytes(), 0));
+ assertThat(converted).isEqualTo(expected);
+ }
+
+ private BinaryArray toBinaryArray(DataType eleType, Object... values) {
+ return toBinaryArray(eleType, new GenericArray(values));
+ }
+
+ private BinaryArray toBinaryArray(DataType eleType, InternalArray array) {
+ return new InternalArraySerializer(eleType).toBinaryArray(array);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index 15a8df420..dc6d86c7f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -20,7 +20,9 @@ package org.apache.paimon.io;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.safe.SafeBinaryRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.ArrayType;
@@ -29,8 +31,9 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.ObjectSerializer;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -41,13 +44,15 @@ import static
org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
-/** Serializer for {@link DataFileMeta}. */
-public class DataFileMeta08Serializer extends ObjectSerializer<DataFileMeta> {
+/** Serializer for {@link DataFileMeta} with safe deserializer. */
+public class DataFileMeta08Serializer implements Serializable {
private static final long serialVersionUID = 1L;
+ protected final InternalRowSerializer rowSerializer;
+
public DataFileMeta08Serializer() {
- super(schemaFor08());
+ this.rowSerializer = InternalSerializers.create(schemaFor08());
}
private static RowType schemaFor08() {
@@ -70,28 +75,48 @@ public class DataFileMeta08Serializer extends
ObjectSerializer<DataFileMeta> {
return new RowType(fields);
}
- @Override
- public InternalRow toRow(DataFileMeta meta) {
- return GenericRow.of(
- BinaryString.fromString(meta.fileName()),
- meta.fileSize(),
- meta.rowCount(),
- serializeBinaryRow(meta.minKey()),
- serializeBinaryRow(meta.maxKey()),
- meta.keyStats().toRow(),
- meta.valueStats().toRow(),
- meta.minSequenceNumber(),
- meta.maxSequenceNumber(),
- meta.schemaId(),
- meta.level(),
- toStringArrayData(meta.extraFiles()),
- meta.creationTime(),
- meta.deleteRowCount().orElse(null),
- meta.embeddedIndex());
+ public final void serializeList(List<DataFileMeta> records, DataOutputView
target)
+ throws IOException {
+ target.writeInt(records.size());
+ for (DataFileMeta t : records) {
+ serialize(t, target);
+ }
+ }
+
+ private void serialize(DataFileMeta meta, DataOutputView target) throws
IOException {
+ GenericRow row =
+ GenericRow.of(
+ BinaryString.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ serializeBinaryRow(meta.minKey()),
+ serializeBinaryRow(meta.maxKey()),
+ meta.keyStats().toRow(),
+ meta.valueStats().toRow(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ toStringArrayData(meta.extraFiles()),
+ meta.creationTime(),
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex());
+ rowSerializer.serialize(row, target);
+ }
+
+ public final List<DataFileMeta> deserializeList(DataInputView source)
throws IOException {
+ int size = source.readInt();
+ List<DataFileMeta> records = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ records.add(deserialize(source));
+ }
+ return records;
}
- @Override
- public DataFileMeta fromRow(InternalRow row) {
+ private DataFileMeta deserialize(DataInputView in) throws IOException {
+ byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+ SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
return new DataFileMeta(
row.getString(0).toString(),
row.getLong(1),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
index c73c12ffa..c0ae59017 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageLegacyV2Serializer;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import java.io.ByteArrayOutputStream;
@@ -37,6 +38,8 @@ public class ManifestCommittableSerializer implements
VersionedSerializer<Manife
private final CommitMessageSerializer commitMessageSerializer;
+ private CommitMessageLegacyV2Serializer legacyV2CommitMessageSerializer;
+
public ManifestCommittableSerializer() {
this.commitMessageSerializer = new CommitMessageSerializer();
}
@@ -89,8 +92,30 @@ public class ManifestCommittableSerializer implements
VersionedSerializer<Manife
Long watermark = view.readBoolean() ? null : view.readLong();
Map<Integer, Long> offsets = deserializeOffsets(view);
int fileCommittableSerializerVersion = view.readInt();
- List<CommitMessage> fileCommittables =
-
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
+ List<CommitMessage> fileCommittables;
+ try {
+ fileCommittables =
+
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
+ } catch (Exception e) {
+ if (fileCommittableSerializerVersion != 2) {
+ throw e;
+ }
+
+ // rebuild view
+ view = new DataInputDeserializer(serialized);
+ view.readLong();
+ if (!view.readBoolean()) {
+ view.readLong();
+ }
+ deserializeOffsets(view);
+ view.readInt();
+
+ if (legacyV2CommitMessageSerializer == null) {
+ legacyV2CommitMessageSerializer = new
CommitMessageLegacyV2Serializer();
+ }
+ fileCommittables =
legacyV2CommitMessageSerializer.deserializeList(view);
+ }
+
return new ManifestCommittable(identifier, watermark, offsets,
fileCommittables);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
new file mode 100644
index 000000000..1c6564b2e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** A legacy version serializer for {@link CommitMessage}. */
+public class CommitMessageLegacyV2Serializer {
+
+ private DataFileMetaLegacyV2Serializer dataFileSerializer;
+ private IndexFileMetaLegacyV2Serializer indexEntrySerializer;
+
+ public List<CommitMessage> deserializeList(DataInputView view) throws
IOException {
+ int length = view.readInt();
+ List<CommitMessage> list = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ list.add(deserialize(view));
+ }
+ return list;
+ }
+
+ public CommitMessage deserialize(DataInputView view) throws IOException {
+ if (dataFileSerializer == null) {
+ dataFileSerializer = new DataFileMetaLegacyV2Serializer();
+ indexEntrySerializer = new IndexFileMetaLegacyV2Serializer();
+ }
+ return new CommitMessageImpl(
+ deserializeBinaryRow(view),
+ view.readInt(),
+ new DataIncrement(
+ dataFileSerializer.deserializeList(view),
+ Collections.emptyList(),
+ dataFileSerializer.deserializeList(view)),
+ new CompactIncrement(
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view)),
+ new
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+ }
+
+ private static RowType legacyDataFileSchema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
+ fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
+ fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
+ fields.add(new DataField(5, "_KEY_STATS",
SimpleStatsConverter.schema()));
+ fields.add(new DataField(6, "_VALUE_STATS",
SimpleStatsConverter.schema()));
+ fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
+ fields.add(new DataField(10, "_LEVEL", new IntType(false)));
+ fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))));
+ fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
+ return new RowType(fields);
+ }
+
+ private static RowType legacyIndexFileSchema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false)));
+ fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+ return new RowType(fields);
+ }
+
+ /** A legacy version serializer for {@link DataFileMeta}. */
+ private static class DataFileMetaLegacyV2Serializer extends
ObjectSerializer<DataFileMeta> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DataFileMetaLegacyV2Serializer() {
+ super(legacyDataFileSchema());
+ }
+
+ @Override
+ public InternalRow toRow(DataFileMeta meta) {
+ return GenericRow.of(
+ BinaryString.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ serializeBinaryRow(meta.minKey()),
+ serializeBinaryRow(meta.maxKey()),
+ meta.keyStats().toRow(),
+ meta.valueStats().toRow(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ toStringArrayData(meta.extraFiles()),
+ meta.creationTime());
+ }
+
+ @Override
+ public DataFileMeta fromRow(InternalRow row) {
+ return new DataFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getLong(2),
+ deserializeBinaryRow(row.getBinary(3)),
+ deserializeBinaryRow(row.getBinary(4)),
+ SimpleStats.fromRow(row.getRow(5, 3)),
+ SimpleStats.fromRow(row.getRow(6, 3)),
+ row.getLong(7),
+ row.getLong(8),
+ row.getLong(9),
+ row.getInt(10),
+ fromStringArrayData(row.getArray(11)),
+ row.getTimestamp(12, 3),
+ null,
+ null,
+ null);
+ }
+ }
+
+ /** A legacy version serializer for {@link IndexFileMeta}. */
+ private static class IndexFileMetaLegacyV2Serializer extends
ObjectSerializer<IndexFileMeta> {
+
+ public IndexFileMetaLegacyV2Serializer() {
+ super(legacyIndexFileSchema());
+ }
+
+ @Override
+ public InternalRow toRow(IndexFileMeta record) {
+ return GenericRow.of(
+ BinaryString.fromString(record.indexType()),
+ BinaryString.fromString(record.fileName()),
+ record.fileSize(),
+ record.rowCount());
+ }
+
+ @Override
+ public IndexFileMeta fromRow(InternalRow row) {
+ return new IndexFileMeta(
+ row.getString(0).toString(),
+ row.getString(1).toString(),
+ row.getLong(2),
+ row.getLong(3),
+ null);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 53a1f9455..cc8abfa48 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
@@ -30,7 +29,6 @@ import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
-import org.apache.paimon.utils.ObjectSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -106,14 +104,23 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
}
private CommitMessage deserialize(int version, DataInputView view) throws
IOException {
- ObjectSerializer<DataFileMeta> dataFileSerializer;
if (version == CURRENT_VERSION) {
- dataFileSerializer = this.dataFileSerializer;
+ return new CommitMessageImpl(
+ deserializeBinaryRow(view),
+ view.readInt(),
+ new DataIncrement(
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view)),
+ new CompactIncrement(
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view),
+ dataFileSerializer.deserializeList(view)),
+ new IndexIncrement(
+ indexEntrySerializer.deserializeList(view),
+ indexEntrySerializer.deserializeList(view)));
} else if (version <= 2) {
- if (dataFile08Serializer == null) {
- dataFile08Serializer = new DataFileMeta08Serializer();
- }
- dataFileSerializer = dataFile08Serializer;
+ return deserialize08(version, view);
} else {
throw new UnsupportedOperationException(
"Expecting CommitMessageSerializer version to be smaller
or equal than "
@@ -122,17 +129,24 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
+ version
+ ".");
}
+ }
+
+ private CommitMessage deserialize08(int version, DataInputView view)
throws IOException {
+ if (dataFile08Serializer == null) {
+ dataFile08Serializer = new DataFileMeta08Serializer();
+ }
+
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
new DataIncrement(
- dataFileSerializer.deserializeList(view),
- dataFileSerializer.deserializeList(view),
- dataFileSerializer.deserializeList(view)),
+ dataFile08Serializer.deserializeList(view),
+ dataFile08Serializer.deserializeList(view),
+ dataFile08Serializer.deserializeList(view)),
new CompactIncrement(
- dataFileSerializer.deserializeList(view),
- dataFileSerializer.deserializeList(view),
- dataFileSerializer.deserializeList(view)),
+ dataFile08Serializer.deserializeList(view),
+ dataFile08Serializer.deserializeList(view),
+ dataFile08Serializer.deserializeList(view)),
new IndexIncrement(
indexEntrySerializer.deserializeList(view),
version <= 2
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 13b9d1184..744904c71 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -173,4 +173,70 @@ public class
ManifestCommittableSerializerCompatibilityTest {
deserialized = serializer.deserialize(2, v2Bytes);
assertThat(deserialized).isEqualTo(manifestCommittable);
}
+
+ @Test
+ public void testCompatibilityToVersion2PaimonV07() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ null,
+ null,
+ null);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, null);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ new DataIncrement(dataFiles, Collections.emptyList(),
dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(2, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v2-0.7"),
+ true);
+ deserialized = serializer.deserialize(2, v2Bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
}
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
new file mode 100644
index 000000000..5e1fb2f44
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2-0.7
differ