This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e80ec7c [core] Introduce RowCompactedSerializer to compact bytes in
LookupLevels
9e80ec7c is described below
commit 9e80ec7c7c41596f2a063b4525de13f4b76a8f1b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 17 10:00:29 2023 +0800
[core] Introduce RowCompactedSerializer to compact bytes in LookupLevels
This closes #609
---
docs/content/docs/concepts/primary-key-table.md | 4 +
.../data/serializer/RowCompactedSerializer.java | 627 +++++++++++++++++++++
.../flink/table/store/utils/VarLengthIntUtils.java | 17 +
.../data/serializer/InternalRowSerializerTest.java | 8 +-
...erTest.java => RowCompactedSerializerTest.java} | 196 ++-----
.../table/store/file/mergetree/LookupLevels.java | 30 +-
.../file/operation/KeyValueFileStoreWrite.java | 5 +-
.../store/file/mergetree/LookupLevelsTest.java | 11 +-
8 files changed, 724 insertions(+), 174 deletions(-)
diff --git a/docs/content/docs/concepts/primary-key-table.md
b/docs/content/docs/concepts/primary-key-table.md
index 7cfa1aa0..c7a88200 100644
--- a/docs/content/docs/concepts/primary-key-table.md
+++ b/docs/content/docs/concepts/primary-key-table.md
@@ -146,6 +146,10 @@ By specifying `'changelog-producer' = 'input'`, Table
Store writers rely on thei
### Lookup
+{{< hint info >}}
+This is an experimental feature.
+{{< /hint >}}
+
If your input can’t produce a complete changelog but you still want to get rid
of the costly normalized operator, you may consider using the `'lookup'`
changelog producer.
By specifying `'changelog-producer' = 'lookup'`, Table Store will generate
changelog through `'lookup'` before committing the data writing.
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializer.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializer.java
new file mode 100644
index 00000000..d199fc93
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializer.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.data.serializer;
+
+import org.apache.flink.table.store.annotation.VisibleForTesting;
+import org.apache.flink.table.store.data.BinaryArray;
+import org.apache.flink.table.store.data.BinaryMap;
+import org.apache.flink.table.store.data.BinaryString;
+import org.apache.flink.table.store.data.Decimal;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalArray;
+import org.apache.flink.table.store.data.InternalMap;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.InternalRow.FieldGetter;
+import org.apache.flink.table.store.data.Timestamp;
+import org.apache.flink.table.store.io.DataInputView;
+import org.apache.flink.table.store.io.DataOutputView;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.types.RowType;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.table.store.data.BinaryRow.HEADER_SIZE_IN_BITS;
+import static org.apache.flink.table.store.memory.MemorySegmentUtils.bitGet;
+import static org.apache.flink.table.store.memory.MemorySegmentUtils.bitSet;
+import static org.apache.flink.table.store.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.table.store.types.DataTypeChecks.getScale;
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
+
+/** A {@link Serializer} for {@link InternalRow} using compacted binary. */
+public class RowCompactedSerializer implements Serializer<InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FieldGetter[] getters;
+ private final FieldWriter[] writers;
+ private final FieldReader[] readers;
+ private final RowType rowType;
+
+ @Nullable private RowWriter rowWriter;
+
+ @Nullable private RowReader rowReader;
+
+ public static int calculateBitSetInBytes(int arity) {
+ return (arity + 7 + HEADER_SIZE_IN_BITS) / 8;
+ }
+
+ public RowCompactedSerializer(RowType rowType) {
+ this.getters = new FieldGetter[rowType.getFieldCount()];
+ this.writers = new FieldWriter[rowType.getFieldCount()];
+ this.readers = new FieldReader[rowType.getFieldCount()];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataType type = rowType.getTypeAt(i);
+ getters[i] = InternalRow.createFieldGetter(type, i);
+ writers[i] = createFieldWriter(type);
+ readers[i] = createFieldReader(type);
+ }
+ this.rowType = rowType;
+ }
+
+ @VisibleForTesting
+ RowType rowType() {
+ return rowType;
+ }
+
+ @Override
+ public Serializer<InternalRow> duplicate() {
+ return new RowCompactedSerializer(rowType);
+ }
+
+ @Override
+ public InternalRow copy(InternalRow from) {
+ return deserialize(serializeToBytes(from));
+ }
+
+ @Override
+ public void serialize(InternalRow record, DataOutputView target) throws
IOException {
+ byte[] bytes = serializeToBytes(record);
+ VarLengthIntUtils.encodeInt(target, bytes.length);
+ target.write(bytes);
+ }
+
+ @Override
+ public InternalRow deserialize(DataInputView source) throws IOException {
+ int len = VarLengthIntUtils.decodeInt(source);
+ byte[] bytes = new byte[len];
+ source.readFully(bytes);
+ return deserialize(bytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowCompactedSerializer that = (RowCompactedSerializer) o;
+ return Objects.equals(rowType, that.rowType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowType);
+ }
+
+ public byte[] serializeToBytes(InternalRow record) {
+ if (rowWriter == null) {
+ rowWriter = new RowWriter(calculateBitSetInBytes(getters.length));
+ }
+ rowWriter.reset();
+ rowWriter.writeRowKind(record.getRowKind());
+ for (int i = 0; i < getters.length; i++) {
+ Object field = getters[i].getFieldOrNull(record);
+ if (field == null) {
+ rowWriter.setNullAt(i);
+ } else {
+ writers[i].writeField(rowWriter, i, field);
+ }
+ }
+ return rowWriter.copyBuffer();
+ }
+
+ public InternalRow deserialize(byte[] bytes) {
+ if (rowReader == null) {
+ rowReader = new RowReader(calculateBitSetInBytes(getters.length));
+ }
+ rowReader.pointTo(bytes);
+ GenericRow row = new GenericRow(readers.length);
+ row.setRowKind(rowReader.readRowKind());
+ for (int i = 0; i < readers.length; i++) {
+ row.setField(i, rowReader.isNullAt(i) ? null :
readers[i].readField(rowReader, i));
+ }
+ return row;
+ }
+
+ private static FieldWriter createFieldWriter(DataType fieldType) {
+ final FieldWriter fieldWriter;
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ fieldWriter = (writer, pos, value) ->
writer.writeString((BinaryString) value);
+ break;
+ case BOOLEAN:
+ fieldWriter = (writer, pos, value) ->
writer.writeBoolean((boolean) value);
+ break;
+ case BINARY:
+ case VARBINARY:
+ fieldWriter = (writer, pos, value) ->
writer.writeBinary((byte[]) value);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ fieldWriter =
+ (writer, pos, value) ->
+ writer.writeDecimal((Decimal) value,
decimalPrecision);
+ break;
+ case TINYINT:
+ fieldWriter = (writer, pos, value) -> writer.writeByte((byte)
value);
+ break;
+ case SMALLINT:
+ fieldWriter = (writer, pos, value) ->
writer.writeShort((short) value);
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldWriter = (writer, pos, value) -> writer.writeInt((int)
value);
+ break;
+ case BIGINT:
+ fieldWriter = (writer, pos, value) -> writer.writeLong((long)
value);
+ break;
+ case FLOAT:
+ fieldWriter = (writer, pos, value) ->
writer.writeFloat((float) value);
+ break;
+ case DOUBLE:
+ fieldWriter = (writer, pos, value) ->
writer.writeDouble((double) value);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int timestampPrecision = getPrecision(fieldType);
+ fieldWriter =
+ (writer, pos, value) ->
+ writer.writeTimestamp((Timestamp) value,
timestampPrecision);
+ break;
+ case ARRAY:
+ Serializer<InternalArray> arraySerializer =
InternalSerializers.create(fieldType);
+ fieldWriter =
+ (writer, pos, value) ->
+ writer.writeArray(
+ (InternalArray) value,
+ (InternalArraySerializer)
arraySerializer);
+ break;
+ case MULTISET:
+ case MAP:
+ Serializer<InternalMap> mapSerializer =
InternalSerializers.create(fieldType);
+ fieldWriter =
+ (writer, pos, value) ->
+ writer.writeMap(
+ (InternalMap) value,
(InternalMapSerializer) mapSerializer);
+ break;
+ case ROW:
+ RowCompactedSerializer rowSerializer =
+ new RowCompactedSerializer((RowType) fieldType);
+ fieldWriter =
+ (writer, pos, value) -> writer.writeRow((InternalRow)
value, rowSerializer);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ if (!fieldType.isNullable()) {
+ return fieldWriter;
+ }
+ return (writer, pos, value) -> {
+ if (value == null) {
+ writer.setNullAt(pos);
+ } else {
+ fieldWriter.writeField(writer, pos, value);
+ }
+ };
+ }
+
+ private static FieldReader createFieldReader(DataType fieldType) {
+ final FieldReader fieldReader;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ fieldReader = (reader, pos) -> reader.readString();
+ break;
+ case BOOLEAN:
+ fieldReader = (reader, pos) -> reader.readBoolean();
+ break;
+ case BINARY:
+ case VARBINARY:
+ fieldReader = (reader, pos) -> reader.readBinary();
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldReader = (reader, pos) ->
reader.readDecimal(decimalPrecision, decimalScale);
+ break;
+ case TINYINT:
+ fieldReader = (reader, pos) -> reader.readByte();
+ break;
+ case SMALLINT:
+ fieldReader = (reader, pos) -> reader.readShort();
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldReader = (reader, pos) -> reader.readInt();
+ break;
+ case BIGINT:
+ fieldReader = (reader, pos) -> reader.readLong();
+ break;
+ case FLOAT:
+ fieldReader = (reader, pos) -> reader.readFloat();
+ break;
+ case DOUBLE:
+ fieldReader = (reader, pos) -> reader.readDouble();
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int timestampPrecision = getPrecision(fieldType);
+ fieldReader = (reader, pos) ->
reader.readTimestamp(timestampPrecision);
+ break;
+ case ARRAY:
+ fieldReader = (reader, pos) -> reader.readArray();
+ break;
+ case MULTISET:
+ case MAP:
+ fieldReader = (reader, pos) -> reader.readMap();
+ break;
+ case ROW:
+ RowCompactedSerializer serializer = new
RowCompactedSerializer((RowType) fieldType);
+ fieldReader = (reader, pos) -> reader.readRow(serializer);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ if (!fieldType.isNullable()) {
+ return fieldReader;
+ }
+ return (reader, pos) -> {
+ if (reader.isNullAt(pos)) {
+ return null;
+ }
+ return fieldReader.readField(reader, pos);
+ };
+ }
+
+ private interface FieldWriter extends Serializable {
+ void writeField(RowWriter writer, int pos, Object value);
+ }
+
+ private interface FieldReader extends Serializable {
+ Object readField(RowReader reader, int pos);
+ }
+
+ private static class RowWriter {
+
+ // Including RowKind and null bits.
+ private final int headerSizeInBytes;
+
+ private byte[] buffer;
+ private MemorySegment segment;
+ private int position;
+
+ private RowWriter(int headerSizeInBytes) {
+ this.headerSizeInBytes = headerSizeInBytes;
+ setBuffer(new byte[Math.max(64, headerSizeInBytes)]);
+ this.position = headerSizeInBytes;
+ }
+
+ private void reset() {
+ this.position = headerSizeInBytes;
+ for (int i = 0; i < headerSizeInBytes; i++) {
+ buffer[i] = 0;
+ }
+ }
+
+ private void writeRowKind(RowKind kind) {
+ this.buffer[0] = kind.toByteValue();
+ }
+
+ private void setNullAt(int pos) {
+ bitSet(segment, 0, pos + HEADER_SIZE_IN_BITS);
+ }
+
+ private void writeBoolean(boolean value) {
+ ensureCapacity(1);
+ segment.putBoolean(position++, value);
+ }
+
+ private void writeByte(byte value) {
+ ensureCapacity(1);
+ segment.put(position++, value);
+ }
+
+ private void writeShort(short value) {
+ ensureCapacity(2);
+ segment.putShort(position, value);
+ position += 2;
+ }
+
+ private void writeInt(int value) {
+ ensureCapacity(4);
+ segment.putInt(position, value);
+ position += 4;
+ }
+
+ private void writeLong(long value) {
+ ensureCapacity(8);
+ segment.putLong(position, value);
+ position += 8;
+ }
+
+ private void writeFloat(float value) {
+ ensureCapacity(4);
+ segment.putFloat(position, value);
+ position += 4;
+ }
+
+ private void writeDouble(double value) {
+ ensureCapacity(8);
+ segment.putDouble(position, value);
+ position += 8;
+ }
+
+ private void writeString(BinaryString value) {
+ writeSegments(value.getSegments(), value.getOffset(),
value.getSizeInBytes());
+ }
+
+ private void writeDecimal(Decimal value, int precision) {
+ if (Decimal.isCompact(precision)) {
+ writeLong(value.toUnscaledLong());
+ } else {
+ writeBinary(value.toUnscaledBytes());
+ }
+ }
+
+ private void writeTimestamp(Timestamp value, int precision) {
+ if (Timestamp.isCompact(precision)) {
+ writeLong(value.getMillisecond());
+ } else {
+ writeLong(value.getMillisecond());
+ writeUnsignedInt(value.getNanoOfMillisecond());
+ }
+ }
+
+ private void writeUnsignedInt(int value) {
+ checkArgument(value >= 0);
+ ensureCapacity(5);
+ int len = VarLengthIntUtils.encodeInt(buffer, position, value);
+ position += len;
+ }
+
+ private void writeArray(InternalArray value, InternalArraySerializer
serializer) {
+ BinaryArray binary = serializer.toBinaryArray(value);
+ writeSegments(binary.getSegments(), binary.getOffset(),
binary.getSizeInBytes());
+ }
+
+ private void writeMap(InternalMap value, InternalMapSerializer
serializer) {
+ BinaryMap binary = serializer.toBinaryMap(value);
+ writeSegments(binary.getSegments(), binary.getOffset(),
binary.getSizeInBytes());
+ }
+
+ private void writeRow(InternalRow value, RowCompactedSerializer
serializer) {
+ writeBinary(serializer.serializeToBytes(value));
+ }
+
+ private byte[] copyBuffer() {
+ return Arrays.copyOf(buffer, position);
+ }
+
+ private void setBuffer(byte[] buffer) {
+ this.buffer = buffer;
+ this.segment = MemorySegment.wrap(buffer);
+ }
+
+ private void ensureCapacity(int size) {
+ if (buffer.length - position < size) {
+ grow(size);
+ }
+ }
+
+ private void grow(int minCapacityAdd) {
+ int newLen = Math.max(this.buffer.length * 2, this.buffer.length +
minCapacityAdd);
+ setBuffer(Arrays.copyOf(this.buffer, newLen));
+ }
+
+ private void writeBinary(byte[] value) {
+ writeUnsignedInt(value.length);
+ ensureCapacity(value.length);
+ System.arraycopy(value, 0, buffer, position, value.length);
+ position += value.length;
+ }
+
+ private void write(MemorySegment segment, int off, int len) {
+ ensureCapacity(len);
+ segment.get(off, this.buffer, this.position, len);
+ this.position += len;
+ }
+
+ private void writeSegments(MemorySegment[] segments, int off, int len)
{
+ writeUnsignedInt(len);
+ if (len + off <= segments[0].size()) {
+ write(segments[0], off, len);
+ } else {
+ write(segments, off, len);
+ }
+ }
+
+ private void write(MemorySegment[] segments, int off, int len) {
+ ensureCapacity(len);
+ int toWrite = len;
+ int fromOffset = off;
+ int toOffset = this.position;
+ for (MemorySegment sourceSegment : segments) {
+ int remain = sourceSegment.size() - fromOffset;
+ if (remain > 0) {
+ int localToWrite = Math.min(remain, toWrite);
+ sourceSegment.get(fromOffset, buffer, toOffset,
localToWrite);
+ toWrite -= localToWrite;
+ toOffset += localToWrite;
+ fromOffset = 0;
+ } else {
+ fromOffset -= sourceSegment.size();
+ }
+ }
+ this.position += len;
+ }
+ }
+
+ private static class RowReader {
+
+ // Including RowKind and null bits.
+ private final int headerSizeInBytes;
+
+ private MemorySegment segment;
+ private MemorySegment[] segments;
+ private int position;
+
+ private RowReader(int headerSizeInBytes) {
+ this.headerSizeInBytes = headerSizeInBytes;
+ }
+
+ private void pointTo(byte[] bytes) {
+ this.segment = MemorySegment.wrap(bytes);
+ this.segments = new MemorySegment[] {segment};
+ this.position = headerSizeInBytes;
+ }
+
+ private RowKind readRowKind() {
+ return RowKind.fromByteValue(segment.get(0));
+ }
+
+ private boolean isNullAt(int pos) {
+ return bitGet(segment, 0, pos + HEADER_SIZE_IN_BITS);
+ }
+
+ private boolean readBoolean() {
+ return segment.getBoolean(position++);
+ }
+
+ private byte readByte() {
+ return segment.get(position++);
+ }
+
+ private short readShort() {
+ short value = segment.getShort(position);
+ position += 2;
+ return value;
+ }
+
+ private int readInt() {
+ int value = segment.getInt(position);
+ position += 4;
+ return value;
+ }
+
+ private long readLong() {
+ long value = segment.getLong(position);
+ position += 8;
+ return value;
+ }
+
+ private float readFloat() {
+ float value = segment.getFloat(position);
+ position += 4;
+ return value;
+ }
+
+ private double readDouble() {
+ double value = segment.getDouble(position);
+ position += 8;
+ return value;
+ }
+
+ private BinaryString readString() {
+ int length = readUnsignedInt();
+ BinaryString string = BinaryString.fromAddress(segments, position,
length);
+ position += length;
+ return string;
+ }
+
+ private int readUnsignedInt() {
+ for (int offset = 0, result = 0; offset < 32; offset += 7) {
+ int b = readByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed integer.");
+ }
+
+ private Decimal readDecimal(int precision, int scale) {
+ return Decimal.isCompact(precision)
+ ? Decimal.fromUnscaledLong(readLong(), precision, scale)
+ : Decimal.fromUnscaledBytes(readBinary(), precision,
scale);
+ }
+
+ private Timestamp readTimestamp(int precision) {
+ if (Timestamp.isCompact(precision)) {
+ return Timestamp.fromEpochMillis(readLong());
+ }
+ long milliseconds = readLong();
+ int nanosOfMillisecond = readUnsignedInt();
+ return Timestamp.fromEpochMillis(milliseconds, nanosOfMillisecond);
+ }
+
+ private byte[] readBinary() {
+ int length = readUnsignedInt();
+ byte[] bytes = new byte[length];
+ segment.get(position, bytes, 0, length);
+ position += length;
+ return bytes;
+ }
+
+ private InternalArray readArray() {
+ BinaryArray value = new BinaryArray();
+ int length = readUnsignedInt();
+ value.pointTo(segments, position, length);
+ position += length;
+ return value;
+ }
+
+ private InternalMap readMap() {
+ BinaryMap value = new BinaryMap();
+ int length = readUnsignedInt();
+ value.pointTo(segments, position, length);
+ position += length;
+ return value;
+ }
+
+ private InternalRow readRow(RowCompactedSerializer serializer) {
+ byte[] bytes = readBinary();
+ return serializer.deserialize(bytes);
+ }
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
index 481a1597..706e95b4 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
@@ -80,6 +80,23 @@ public final class VarLengthIntUtils {
throw new Error("Malformed long.");
}
+ /** @return bytes length. */
+ public static int encodeInt(byte[] bytes, int offset, int value) {
+
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ int i = 1;
+ while ((value & ~0x7F) != 0) {
+ bytes[i + offset - 1] = (byte) ((value & 0x7F) | 0x80);
+ value >>>= 7;
+ i++;
+ }
+ bytes[i + offset - 1] = (byte) value;
+ return i;
+ }
+
/** @return bytes length. */
public static int encodeInt(DataOutput os, int value) throws IOException {
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
index afeacd9b..e3ff57f5 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
@@ -59,7 +59,7 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
//
----------------------------------------------------------------------------------------------
- private static BinaryArray createArray(int... ints) {
+ public static BinaryArray createArray(int... ints) {
BinaryArray array = new BinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, ints.length,
4);
for (int i = 0; i < ints.length; i++) {
@@ -69,11 +69,11 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
return array;
}
- private static BinaryMap createMap(int[] keys, int[] values) {
+ public static BinaryMap createMap(int[] keys, int[] values) {
return BinaryMap.valueOf(createArray(keys), createArray(values));
}
- private static GenericRow createRow(Object f0, Object f1, Object f2,
Object f3, Object f4) {
+ public static GenericRow createRow(Object f0, Object f1, Object f2, Object
f3, Object f4) {
GenericRow row = new GenericRow(5);
row.setField(0, f0);
row.setField(1, f1);
@@ -83,7 +83,7 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
return row;
}
- private static boolean deepEqualsInternalRow(
+ public static boolean deepEqualsInternalRow(
InternalRow should,
InternalRow is,
InternalRowSerializer serializer1,
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializerTest.java
similarity index 50%
copy from
flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
copy to
flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializerTest.java
index afeacd9b..3752800b 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializerTest.java
@@ -18,34 +18,27 @@
package org.apache.flink.table.store.data.serializer;
-import org.apache.flink.table.store.data.BinaryArray;
-import org.apache.flink.table.store.data.BinaryArrayWriter;
-import org.apache.flink.table.store.data.BinaryMap;
-import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.BinaryString;
import org.apache.flink.table.store.data.GenericRow;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.types.DataType;
import org.apache.flink.table.store.types.DataTypes;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Objects;
+import org.apache.flink.table.store.types.RowType;
import static org.apache.flink.table.store.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.createArray;
+import static
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.createMap;
+import static
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.createRow;
+import static
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.deepEqualsInternalRow;
-/** Test for {@link InternalRowSerializer}. */
-abstract class InternalRowSerializerTest extends
SerializerTestInstance<InternalRow> {
+/** Test for {@link RowCompactedSerializer}. */
+abstract class RowCompactedSerializerTest extends
SerializerTestInstance<InternalRow> {
- private final InternalRowSerializer serializer;
- private final InternalRow[] testData;
+ private final RowCompactedSerializer serializer;
- InternalRowSerializerTest(InternalRowSerializer serializer, InternalRow[]
testData) {
+ RowCompactedSerializerTest(RowCompactedSerializer serializer,
InternalRow[] testData) {
super(serializer, testData);
this.serializer = serializer;
- this.testData = testData;
}
@Override
@@ -53,105 +46,12 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
return deepEqualsInternalRow(
o1,
o2,
- (InternalRowSerializer) serializer.duplicate(),
- (InternalRowSerializer) serializer.duplicate());
- }
-
- //
----------------------------------------------------------------------------------------------
-
- private static BinaryArray createArray(int... ints) {
- BinaryArray array = new BinaryArray();
- BinaryArrayWriter writer = new BinaryArrayWriter(array, ints.length,
4);
- for (int i = 0; i < ints.length; i++) {
- writer.writeInt(i, ints[i]);
- }
- writer.complete();
- return array;
- }
-
- private static BinaryMap createMap(int[] keys, int[] values) {
- return BinaryMap.valueOf(createArray(keys), createArray(values));
- }
-
- private static GenericRow createRow(Object f0, Object f1, Object f2,
Object f3, Object f4) {
- GenericRow row = new GenericRow(5);
- row.setField(0, f0);
- row.setField(1, f1);
- row.setField(2, f2);
- row.setField(3, f3);
- row.setField(4, f4);
- return row;
- }
-
- private static boolean deepEqualsInternalRow(
- InternalRow should,
- InternalRow is,
- InternalRowSerializer serializer1,
- InternalRowSerializer serializer2) {
- return deepEqualsInternalRow(should, is, serializer1, serializer2,
false);
- }
-
- private static boolean deepEqualsInternalRow(
- InternalRow should,
- InternalRow is,
- InternalRowSerializer serializer1,
- InternalRowSerializer serializer2,
- boolean checkClass) {
- if (should.getFieldCount() != is.getFieldCount()) {
- return false;
- }
- if (checkClass && (should.getClass() != is.getClass() ||
!should.equals(is))) {
- return false;
- }
-
- BinaryRow row1 = serializer1.toBinaryRow(should);
- BinaryRow row2 = serializer2.toBinaryRow(is);
-
- return Objects.equals(row1, row2);
- }
-
- private void checkDeepEquals(InternalRow should, InternalRow is, boolean
checkClass) {
- boolean equals =
- deepEqualsInternalRow(
- should,
- is,
- (InternalRowSerializer) serializer.duplicate(),
- (InternalRowSerializer) serializer.duplicate(),
- checkClass);
- assertThat(equals).isTrue();
- }
-
- @Test
- protected void testCopy() {
- for (InternalRow row : testData) {
- checkDeepEquals(row, serializer.copy(row), true);
- }
-
- for (InternalRow row : testData) {
- checkDeepEquals(row, serializer.copy(row), true);
- }
-
- for (InternalRow row : testData) {
- checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)),
false);
- }
-
- for (InternalRow row : testData) {
- checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)),
false);
- }
-
- for (InternalRow row : testData) {
- checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)),
false);
- }
- }
-
- @Test
- void testWrongCopy() {
- assertThatThrownBy(() -> serializer.copy(new
GenericRow(serializer.getArity() + 1)))
- .isInstanceOf(IllegalArgumentException.class);
+ new InternalRowSerializer(serializer.rowType()),
+ new InternalRowSerializer(serializer.rowType()));
}
- static final class SimpleInternalRowSerializerTest extends
InternalRowSerializerTest {
- public SimpleInternalRowSerializerTest() {
+ static final class SimpleTest extends RowCompactedSerializerTest {
+ public SimpleTest() {
super(getRowSerializer(), getData());
}
@@ -167,13 +67,13 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
return new InternalRow[] {row1, row2};
}
- private static InternalRowSerializer getRowSerializer() {
- return new InternalRowSerializer(DataTypes.INT(),
DataTypes.STRING());
+ private static RowCompactedSerializer getRowSerializer() {
+ return new RowCompactedSerializer(RowType.of(DataTypes.INT(),
DataTypes.STRING()));
}
}
- static final class LargeInternalRowSerializerTest extends
InternalRowSerializerTest {
- public LargeInternalRowSerializerTest() {
+ static final class LargeTest extends RowCompactedSerializerTest {
+ public LargeTest() {
super(getRowSerializer(), getData());
}
@@ -195,26 +95,27 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
return new InternalRow[] {row};
}
- private static InternalRowSerializer getRowSerializer() {
- return new InternalRowSerializer(
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.STRING());
+ private static RowCompactedSerializer getRowSerializer() {
+ return new RowCompactedSerializer(
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.STRING()));
}
}
- static final class InternalRowSerializerWithComplexTypesTest extends
InternalRowSerializerTest {
- public InternalRowSerializerWithComplexTypesTest() {
+ static final class ComplexTypesTest extends RowCompactedSerializerTest {
+ public ComplexTypesTest() {
super(getRowSerializer(), getData());
}
@@ -266,26 +167,26 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
};
}
- private static InternalRowSerializer getRowSerializer() {
- return new InternalRowSerializer(
- DataTypes.INT(),
- DataTypes.DOUBLE(),
- DataTypes.STRING(),
- DataTypes.ARRAY(DataTypes.INT()),
- DataTypes.MAP(DataTypes.INT(), DataTypes.INT()));
+ private static RowCompactedSerializer getRowSerializer() {
+ return new RowCompactedSerializer(
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.DOUBLE(),
+ DataTypes.STRING(),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.MAP(DataTypes.INT(), DataTypes.INT())));
}
}
- static final class InternalRowSerializerWithNestedInternalRowTest
- extends InternalRowSerializerTest {
+ static final class NestedInternalRowTest extends
RowCompactedSerializerTest {
- private static final DataType NESTED_DATA_TYPE =
+ private static final RowType NESTED_DATA_TYPE =
DataTypes.ROW(
DataTypes.FIELD(0, "ri", DataTypes.INT()),
DataTypes.FIELD(1, "rs", DataTypes.STRING()),
DataTypes.FIELD(2, "rb", DataTypes.BIGINT()));
- public InternalRowSerializerWithNestedInternalRowTest() {
+ public NestedInternalRowTest() {
super(getRowSerializer(), getData());
}
@@ -314,9 +215,8 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
return new InternalRow[] {nestedRow1, nestedRow2};
}
- private static InternalRowSerializer getRowSerializer() {
- return (InternalRowSerializer)
- InternalSerializers.<InternalRow>create(NESTED_DATA_TYPE);
+ private static RowCompactedSerializer getRowSerializer() {
+ return new RowCompactedSerializer(NESTED_DATA_TYPE);
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
index cc50f718..7b7b8dbc 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
@@ -19,9 +19,8 @@
package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.store.annotation.VisibleForTesting;
-import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
+import org.apache.flink.table.store.data.serializer.RowCompactedSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.io.DataOutputSerializer;
@@ -32,6 +31,7 @@ import org.apache.flink.table.store.memory.MemorySegment;
import org.apache.flink.table.store.options.MemorySize;
import org.apache.flink.table.store.reader.RecordReader;
import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.utils.FileIOUtils;
import org.apache.flink.table.store.utils.IOFunction;
@@ -58,8 +58,8 @@ public class LookupLevels implements Levels.DropFileCallback,
Closeable {
private final Levels levels;
private final Comparator<InternalRow> keyComparator;
- private final InternalRowSerializer keySerializer;
- private final InternalRowSerializer valueSerializer;
+ private final RowCompactedSerializer keySerializer;
+ private final RowCompactedSerializer valueSerializer;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
private final Supplier<File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
@@ -69,8 +69,8 @@ public class LookupLevels implements Levels.DropFileCallback,
Closeable {
public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
- InternalRowSerializer keySerializer,
- InternalRowSerializer valueSerializer,
+ RowType keyType,
+ RowType valueType,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Supplier<File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
@@ -78,8 +78,8 @@ public class LookupLevels implements Levels.DropFileCallback,
Closeable {
MemorySize maxDiskSize) {
this.levels = levels;
this.keyComparator = keyComparator;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
+ this.keySerializer = new RowCompactedSerializer(keyType);
+ this.valueSerializer = new RowCompactedSerializer(valueType);
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
@@ -164,16 +164,14 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
} catch (ExecutionException e) {
throw new IOException(e);
}
- byte[] keyBytes = keySerializer.toBinaryRow(key).toBytes();
+ byte[] keyBytes = keySerializer.serializeToBytes(key);
byte[] valueBytes = lookupFile.get(keyBytes);
if (valueBytes == null) {
return null;
}
- MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
- long sequenceNumber = memorySegment.getLong(0);
- RowKind rowKind = RowKind.fromByteValue(valueBytes[8]);
- BinaryRow value = new BinaryRow(valueSerializer.getArity());
- value.pointTo(memorySegment, 9, valueBytes.length - 9);
+ InternalRow value = valueSerializer.deserialize(valueBytes);
+ long sequenceNumber =
MemorySegment.wrap(valueBytes).getLong(valueBytes.length - 9);
+ RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length -
1]);
return new KeyValue()
.replace(key, sequenceNumber, rowKind, value)
.setLevel(lookupFile.remoteFile().level());
@@ -206,11 +204,11 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
KeyValue kv;
while ((batch = reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
- byte[] keyBytes =
keySerializer.toBinaryRow(kv.key()).toBytes();
+ byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
valueOut.clear();
+
valueOut.write(valueSerializer.serializeToBytes(kv.value()));
valueOut.writeLong(kv.sequenceNumber());
valueOut.writeByte(kv.valueKind().toByteValue());
-
valueOut.write(valueSerializer.toBinaryRow(kv.value()).toBytes());
byte[] valueBytes = valueOut.getCopyOfBuffer();
kvWriter.put(keyBytes, valueBytes);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 23cbbe6d..97778e29 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.NoopCompactManager;
@@ -240,8 +239,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
return new LookupLevels(
levels,
keyComparatorSupplier.get(),
- new InternalRowSerializer(keyType),
- new InternalRowSerializer(valueType),
+ keyType,
+ valueType,
file ->
readerFactory.createRecordReader(
file.schemaId(), file.fileName(),
file.level()),
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
index d68d9e05..39077a13 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.GenericRow;
import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.format.FlushingFileFormat;
import org.apache.flink.table.store.file.io.DataFileMeta;
@@ -59,6 +58,7 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.flink.table.store.CoreOptions.TARGET_FILE_SIZE;
+import static org.apache.flink.table.store.file.KeyValue.UNKNOWN_SEQUENCE;
import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -91,18 +91,21 @@ public class LookupLevelsTest {
// only in level 1
KeyValue kv = lookupLevels.lookup(row(1), 1);
assertThat(kv).isNotNull();
+ assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(11);
// only in level 2
kv = lookupLevels.lookup(row(2), 1);
assertThat(kv).isNotNull();
+ assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
assertThat(kv.level()).isEqualTo(2);
assertThat(kv.value().getInt(1)).isEqualTo(22);
// both in level 1 and level 2
kv = lookupLevels.lookup(row(5), 1);
assertThat(kv).isNotNull();
+ assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(5);
@@ -143,6 +146,7 @@ public class LookupLevelsTest {
for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
KeyValue kv = lookupLevels.lookup(row(entry.getKey()), 1);
assertThat(kv).isNotNull();
+ assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(entry.getValue());
}
@@ -176,6 +180,7 @@ public class LookupLevelsTest {
for (int i = 0; i < fileNum * recordInFile; i++) {
KeyValue kv = lookupLevels.lookup(row(i), 1);
assertThat(kv).isNotNull();
+ assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(i);
}
@@ -195,8 +200,8 @@ public class LookupLevelsTest {
return new LookupLevels(
levels,
comparator,
- new InternalRowSerializer(keyType),
- new InternalRowSerializer(rowType),
+ keyType,
+ rowType,
file -> createReaderFactory().createRecordReader(0,
file.fileName(), file.level()),
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(new CacheManager(2048,
MemorySize.ofMebiBytes(1)), 0.75),