This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e80ec7c [core] Introduce RowCompactedSerializer to compact bytes in 
LookupLevels
9e80ec7c is described below

commit 9e80ec7c7c41596f2a063b4525de13f4b76a8f1b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 17 10:00:29 2023 +0800

    [core] Introduce RowCompactedSerializer to compact bytes in LookupLevels
    
    This closes #609
---
 docs/content/docs/concepts/primary-key-table.md    |   4 +
 .../data/serializer/RowCompactedSerializer.java    | 627 +++++++++++++++++++++
 .../flink/table/store/utils/VarLengthIntUtils.java |  17 +
 .../data/serializer/InternalRowSerializerTest.java |   8 +-
 ...erTest.java => RowCompactedSerializerTest.java} | 196 ++-----
 .../table/store/file/mergetree/LookupLevels.java   |  30 +-
 .../file/operation/KeyValueFileStoreWrite.java     |   5 +-
 .../store/file/mergetree/LookupLevelsTest.java     |  11 +-
 8 files changed, 724 insertions(+), 174 deletions(-)

diff --git a/docs/content/docs/concepts/primary-key-table.md 
b/docs/content/docs/concepts/primary-key-table.md
index 7cfa1aa0..c7a88200 100644
--- a/docs/content/docs/concepts/primary-key-table.md
+++ b/docs/content/docs/concepts/primary-key-table.md
@@ -146,6 +146,10 @@ By specifying `'changelog-producer' = 'input'`, Table 
Store writers rely on thei
 
 ### Lookup
 
+{{< hint info >}}
+This is an experimental feature.
+{{< /hint >}}
+
 If your input can’t produce a complete changelog but you still want to get rid 
of the costly normalized operator, you may consider using the `'lookup'` 
changelog producer.
 
 By specifying `'changelog-producer' = 'lookup'`, Table Store will generate 
changelog through `'lookup'` before committing the data writing.
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializer.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializer.java
new file mode 100644
index 00000000..d199fc93
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializer.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.data.serializer;
+
+import org.apache.flink.table.store.annotation.VisibleForTesting;
+import org.apache.flink.table.store.data.BinaryArray;
+import org.apache.flink.table.store.data.BinaryMap;
+import org.apache.flink.table.store.data.BinaryString;
+import org.apache.flink.table.store.data.Decimal;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalArray;
+import org.apache.flink.table.store.data.InternalMap;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.InternalRow.FieldGetter;
+import org.apache.flink.table.store.data.Timestamp;
+import org.apache.flink.table.store.io.DataInputView;
+import org.apache.flink.table.store.io.DataOutputView;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.types.RowType;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.table.store.data.BinaryRow.HEADER_SIZE_IN_BITS;
+import static org.apache.flink.table.store.memory.MemorySegmentUtils.bitGet;
+import static org.apache.flink.table.store.memory.MemorySegmentUtils.bitSet;
+import static org.apache.flink.table.store.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.table.store.types.DataTypeChecks.getScale;
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
+
+/** A {@link Serializer} for {@link InternalRow} using compacted binary. */
+public class RowCompactedSerializer implements Serializer<InternalRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FieldGetter[] getters;
+    private final FieldWriter[] writers;
+    private final FieldReader[] readers;
+    private final RowType rowType;
+
+    @Nullable private RowWriter rowWriter;
+
+    @Nullable private RowReader rowReader;
+
+    public static int calculateBitSetInBytes(int arity) {
+        return (arity + 7 + HEADER_SIZE_IN_BITS) / 8;
+    }
+
+    public RowCompactedSerializer(RowType rowType) {
+        this.getters = new FieldGetter[rowType.getFieldCount()];
+        this.writers = new FieldWriter[rowType.getFieldCount()];
+        this.readers = new FieldReader[rowType.getFieldCount()];
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataType type = rowType.getTypeAt(i);
+            getters[i] = InternalRow.createFieldGetter(type, i);
+            writers[i] = createFieldWriter(type);
+            readers[i] = createFieldReader(type);
+        }
+        this.rowType = rowType;
+    }
+
+    @VisibleForTesting
+    RowType rowType() {
+        return rowType;
+    }
+
+    @Override
+    public Serializer<InternalRow> duplicate() {
+        return new RowCompactedSerializer(rowType);
+    }
+
+    @Override
+    public InternalRow copy(InternalRow from) {
+        return deserialize(serializeToBytes(from));
+    }
+
+    @Override
+    public void serialize(InternalRow record, DataOutputView target) throws 
IOException {
+        byte[] bytes = serializeToBytes(record);
+        VarLengthIntUtils.encodeInt(target, bytes.length);
+        target.write(bytes);
+    }
+
+    @Override
+    public InternalRow deserialize(DataInputView source) throws IOException {
+        int len = VarLengthIntUtils.decodeInt(source);
+        byte[] bytes = new byte[len];
+        source.readFully(bytes);
+        return deserialize(bytes);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RowCompactedSerializer that = (RowCompactedSerializer) o;
+        return Objects.equals(rowType, that.rowType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowType);
+    }
+
+    public byte[] serializeToBytes(InternalRow record) {
+        if (rowWriter == null) {
+            rowWriter = new RowWriter(calculateBitSetInBytes(getters.length));
+        }
+        rowWriter.reset();
+        rowWriter.writeRowKind(record.getRowKind());
+        for (int i = 0; i < getters.length; i++) {
+            Object field = getters[i].getFieldOrNull(record);
+            if (field == null) {
+                rowWriter.setNullAt(i);
+            } else {
+                writers[i].writeField(rowWriter, i, field);
+            }
+        }
+        return rowWriter.copyBuffer();
+    }
+
+    public InternalRow deserialize(byte[] bytes) {
+        if (rowReader == null) {
+            rowReader = new RowReader(calculateBitSetInBytes(getters.length));
+        }
+        rowReader.pointTo(bytes);
+        GenericRow row = new GenericRow(readers.length);
+        row.setRowKind(rowReader.readRowKind());
+        for (int i = 0; i < readers.length; i++) {
+            row.setField(i, rowReader.isNullAt(i) ? null : 
readers[i].readField(rowReader, i));
+        }
+        return row;
+    }
+
+    private static FieldWriter createFieldWriter(DataType fieldType) {
+        final FieldWriter fieldWriter;
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                fieldWriter = (writer, pos, value) -> 
writer.writeString((BinaryString) value);
+                break;
+            case BOOLEAN:
+                fieldWriter = (writer, pos, value) -> 
writer.writeBoolean((boolean) value);
+                break;
+            case BINARY:
+            case VARBINARY:
+                fieldWriter = (writer, pos, value) -> 
writer.writeBinary((byte[]) value);
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                fieldWriter =
+                        (writer, pos, value) ->
+                                writer.writeDecimal((Decimal) value, 
decimalPrecision);
+                break;
+            case TINYINT:
+                fieldWriter = (writer, pos, value) -> writer.writeByte((byte) 
value);
+                break;
+            case SMALLINT:
+                fieldWriter = (writer, pos, value) -> 
writer.writeShort((short) value);
+                break;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                fieldWriter = (writer, pos, value) -> writer.writeInt((int) 
value);
+                break;
+            case BIGINT:
+                fieldWriter = (writer, pos, value) -> writer.writeLong((long) 
value);
+                break;
+            case FLOAT:
+                fieldWriter = (writer, pos, value) -> 
writer.writeFloat((float) value);
+                break;
+            case DOUBLE:
+                fieldWriter = (writer, pos, value) -> 
writer.writeDouble((double) value);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                fieldWriter =
+                        (writer, pos, value) ->
+                                writer.writeTimestamp((Timestamp) value, 
timestampPrecision);
+                break;
+            case ARRAY:
+                Serializer<InternalArray> arraySerializer = 
InternalSerializers.create(fieldType);
+                fieldWriter =
+                        (writer, pos, value) ->
+                                writer.writeArray(
+                                        (InternalArray) value,
+                                        (InternalArraySerializer) 
arraySerializer);
+                break;
+            case MULTISET:
+            case MAP:
+                Serializer<InternalMap> mapSerializer = 
InternalSerializers.create(fieldType);
+                fieldWriter =
+                        (writer, pos, value) ->
+                                writer.writeMap(
+                                        (InternalMap) value, 
(InternalMapSerializer) mapSerializer);
+                break;
+            case ROW:
+                RowCompactedSerializer rowSerializer =
+                        new RowCompactedSerializer((RowType) fieldType);
+                fieldWriter =
+                        (writer, pos, value) -> writer.writeRow((InternalRow) 
value, rowSerializer);
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+
+        if (!fieldType.isNullable()) {
+            return fieldWriter;
+        }
+        return (writer, pos, value) -> {
+            if (value == null) {
+                writer.setNullAt(pos);
+            } else {
+                fieldWriter.writeField(writer, pos, value);
+            }
+        };
+    }
+
+    private static FieldReader createFieldReader(DataType fieldType) {
+        final FieldReader fieldReader;
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                fieldReader = (reader, pos) -> reader.readString();
+                break;
+            case BOOLEAN:
+                fieldReader = (reader, pos) -> reader.readBoolean();
+                break;
+            case BINARY:
+            case VARBINARY:
+                fieldReader = (reader, pos) -> reader.readBinary();
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                fieldReader = (reader, pos) -> 
reader.readDecimal(decimalPrecision, decimalScale);
+                break;
+            case TINYINT:
+                fieldReader = (reader, pos) -> reader.readByte();
+                break;
+            case SMALLINT:
+                fieldReader = (reader, pos) -> reader.readShort();
+                break;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                fieldReader = (reader, pos) -> reader.readInt();
+                break;
+            case BIGINT:
+                fieldReader = (reader, pos) -> reader.readLong();
+                break;
+            case FLOAT:
+                fieldReader = (reader, pos) -> reader.readFloat();
+                break;
+            case DOUBLE:
+                fieldReader = (reader, pos) -> reader.readDouble();
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                fieldReader = (reader, pos) -> 
reader.readTimestamp(timestampPrecision);
+                break;
+            case ARRAY:
+                fieldReader = (reader, pos) -> reader.readArray();
+                break;
+            case MULTISET:
+            case MAP:
+                fieldReader = (reader, pos) -> reader.readMap();
+                break;
+            case ROW:
+                RowCompactedSerializer serializer = new 
RowCompactedSerializer((RowType) fieldType);
+                fieldReader = (reader, pos) -> reader.readRow(serializer);
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+        if (!fieldType.isNullable()) {
+            return fieldReader;
+        }
+        return (reader, pos) -> {
+            if (reader.isNullAt(pos)) {
+                return null;
+            }
+            return fieldReader.readField(reader, pos);
+        };
+    }
+
+    private interface FieldWriter extends Serializable {
+        void writeField(RowWriter writer, int pos, Object value);
+    }
+
+    private interface FieldReader extends Serializable {
+        Object readField(RowReader reader, int pos);
+    }
+
+    private static class RowWriter {
+
+        // Including RowKind and null bits.
+        private final int headerSizeInBytes;
+
+        private byte[] buffer;
+        private MemorySegment segment;
+        private int position;
+
+        private RowWriter(int headerSizeInBytes) {
+            this.headerSizeInBytes = headerSizeInBytes;
+            setBuffer(new byte[Math.max(64, headerSizeInBytes)]);
+            this.position = headerSizeInBytes;
+        }
+
+        private void reset() {
+            this.position = headerSizeInBytes;
+            for (int i = 0; i < headerSizeInBytes; i++) {
+                buffer[i] = 0;
+            }
+        }
+
+        private void writeRowKind(RowKind kind) {
+            this.buffer[0] = kind.toByteValue();
+        }
+
+        private void setNullAt(int pos) {
+            bitSet(segment, 0, pos + HEADER_SIZE_IN_BITS);
+        }
+
+        private void writeBoolean(boolean value) {
+            ensureCapacity(1);
+            segment.putBoolean(position++, value);
+        }
+
+        private void writeByte(byte value) {
+            ensureCapacity(1);
+            segment.put(position++, value);
+        }
+
+        private void writeShort(short value) {
+            ensureCapacity(2);
+            segment.putShort(position, value);
+            position += 2;
+        }
+
+        private void writeInt(int value) {
+            ensureCapacity(4);
+            segment.putInt(position, value);
+            position += 4;
+        }
+
+        private void writeLong(long value) {
+            ensureCapacity(8);
+            segment.putLong(position, value);
+            position += 8;
+        }
+
+        private void writeFloat(float value) {
+            ensureCapacity(4);
+            segment.putFloat(position, value);
+            position += 4;
+        }
+
+        private void writeDouble(double value) {
+            ensureCapacity(8);
+            segment.putDouble(position, value);
+            position += 8;
+        }
+
+        private void writeString(BinaryString value) {
+            writeSegments(value.getSegments(), value.getOffset(), 
value.getSizeInBytes());
+        }
+
+        private void writeDecimal(Decimal value, int precision) {
+            if (Decimal.isCompact(precision)) {
+                writeLong(value.toUnscaledLong());
+            } else {
+                writeBinary(value.toUnscaledBytes());
+            }
+        }
+
+        private void writeTimestamp(Timestamp value, int precision) {
+            if (Timestamp.isCompact(precision)) {
+                writeLong(value.getMillisecond());
+            } else {
+                writeLong(value.getMillisecond());
+                writeUnsignedInt(value.getNanoOfMillisecond());
+            }
+        }
+
+        private void writeUnsignedInt(int value) {
+            checkArgument(value >= 0);
+            ensureCapacity(5);
+            int len = VarLengthIntUtils.encodeInt(buffer, position, value);
+            position += len;
+        }
+
+        private void writeArray(InternalArray value, InternalArraySerializer 
serializer) {
+            BinaryArray binary = serializer.toBinaryArray(value);
+            writeSegments(binary.getSegments(), binary.getOffset(), 
binary.getSizeInBytes());
+        }
+
+        private void writeMap(InternalMap value, InternalMapSerializer 
serializer) {
+            BinaryMap binary = serializer.toBinaryMap(value);
+            writeSegments(binary.getSegments(), binary.getOffset(), 
binary.getSizeInBytes());
+        }
+
+        private void writeRow(InternalRow value, RowCompactedSerializer 
serializer) {
+            writeBinary(serializer.serializeToBytes(value));
+        }
+
+        private byte[] copyBuffer() {
+            return Arrays.copyOf(buffer, position);
+        }
+
+        private void setBuffer(byte[] buffer) {
+            this.buffer = buffer;
+            this.segment = MemorySegment.wrap(buffer);
+        }
+
+        private void ensureCapacity(int size) {
+            if (buffer.length - position < size) {
+                grow(size);
+            }
+        }
+
+        private void grow(int minCapacityAdd) {
+            int newLen = Math.max(this.buffer.length * 2, this.buffer.length + 
minCapacityAdd);
+            setBuffer(Arrays.copyOf(this.buffer, newLen));
+        }
+
+        private void writeBinary(byte[] value) {
+            writeUnsignedInt(value.length);
+            ensureCapacity(value.length);
+            System.arraycopy(value, 0, buffer, position, value.length);
+            position += value.length;
+        }
+
+        private void write(MemorySegment segment, int off, int len) {
+            ensureCapacity(len);
+            segment.get(off, this.buffer, this.position, len);
+            this.position += len;
+        }
+
+        private void writeSegments(MemorySegment[] segments, int off, int len) 
{
+            writeUnsignedInt(len);
+            if (len + off <= segments[0].size()) {
+                write(segments[0], off, len);
+            } else {
+                write(segments, off, len);
+            }
+        }
+
+        private void write(MemorySegment[] segments, int off, int len) {
+            ensureCapacity(len);
+            int toWrite = len;
+            int fromOffset = off;
+            int toOffset = this.position;
+            for (MemorySegment sourceSegment : segments) {
+                int remain = sourceSegment.size() - fromOffset;
+                if (remain > 0) {
+                    int localToWrite = Math.min(remain, toWrite);
+                    sourceSegment.get(fromOffset, buffer, toOffset, 
localToWrite);
+                    toWrite -= localToWrite;
+                    toOffset += localToWrite;
+                    fromOffset = 0;
+                } else {
+                    fromOffset -= sourceSegment.size();
+                }
+            }
+            this.position += len;
+        }
+    }
+
+    private static class RowReader {
+
+        // Including RowKind and null bits.
+        private final int headerSizeInBytes;
+
+        private MemorySegment segment;
+        private MemorySegment[] segments;
+        private int position;
+
+        private RowReader(int headerSizeInBytes) {
+            this.headerSizeInBytes = headerSizeInBytes;
+        }
+
+        private void pointTo(byte[] bytes) {
+            this.segment = MemorySegment.wrap(bytes);
+            this.segments = new MemorySegment[] {segment};
+            this.position = headerSizeInBytes;
+        }
+
+        private RowKind readRowKind() {
+            return RowKind.fromByteValue(segment.get(0));
+        }
+
+        private boolean isNullAt(int pos) {
+            return bitGet(segment, 0, pos + HEADER_SIZE_IN_BITS);
+        }
+
+        private boolean readBoolean() {
+            return segment.getBoolean(position++);
+        }
+
+        private byte readByte() {
+            return segment.get(position++);
+        }
+
+        private short readShort() {
+            short value = segment.getShort(position);
+            position += 2;
+            return value;
+        }
+
+        private int readInt() {
+            int value = segment.getInt(position);
+            position += 4;
+            return value;
+        }
+
+        private long readLong() {
+            long value = segment.getLong(position);
+            position += 8;
+            return value;
+        }
+
+        private float readFloat() {
+            float value = segment.getFloat(position);
+            position += 4;
+            return value;
+        }
+
+        private double readDouble() {
+            double value = segment.getDouble(position);
+            position += 8;
+            return value;
+        }
+
+        private BinaryString readString() {
+            int length = readUnsignedInt();
+            BinaryString string = BinaryString.fromAddress(segments, position, 
length);
+            position += length;
+            return string;
+        }
+
+        private int readUnsignedInt() {
+            for (int offset = 0, result = 0; offset < 32; offset += 7) {
+                int b = readByte();
+                result |= (b & 0x7F) << offset;
+                if ((b & 0x80) == 0) {
+                    return result;
+                }
+            }
+            throw new Error("Malformed integer.");
+        }
+
+        private Decimal readDecimal(int precision, int scale) {
+            return Decimal.isCompact(precision)
+                    ? Decimal.fromUnscaledLong(readLong(), precision, scale)
+                    : Decimal.fromUnscaledBytes(readBinary(), precision, 
scale);
+        }
+
+        private Timestamp readTimestamp(int precision) {
+            if (Timestamp.isCompact(precision)) {
+                return Timestamp.fromEpochMillis(readLong());
+            }
+            long milliseconds = readLong();
+            int nanosOfMillisecond = readUnsignedInt();
+            return Timestamp.fromEpochMillis(milliseconds, nanosOfMillisecond);
+        }
+
+        private byte[] readBinary() {
+            int length = readUnsignedInt();
+            byte[] bytes = new byte[length];
+            segment.get(position, bytes, 0, length);
+            position += length;
+            return bytes;
+        }
+
+        private InternalArray readArray() {
+            BinaryArray value = new BinaryArray();
+            int length = readUnsignedInt();
+            value.pointTo(segments, position, length);
+            position += length;
+            return value;
+        }
+
+        private InternalMap readMap() {
+            BinaryMap value = new BinaryMap();
+            int length = readUnsignedInt();
+            value.pointTo(segments, position, length);
+            position += length;
+            return value;
+        }
+
+        private InternalRow readRow(RowCompactedSerializer serializer) {
+            byte[] bytes = readBinary();
+            return serializer.deserialize(bytes);
+        }
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
index 481a1597..706e95b4 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
@@ -80,6 +80,23 @@ public final class VarLengthIntUtils {
         throw new Error("Malformed long.");
     }
 
+    /** @return bytes length. */
+    public static int encodeInt(byte[] bytes, int offset, int value) {
+
+        if (value < 0) {
+            throw new IllegalArgumentException("negative value: v=" + value);
+        }
+
+        int i = 1;
+        while ((value & ~0x7F) != 0) {
+            bytes[i + offset - 1] = (byte) ((value & 0x7F) | 0x80);
+            value >>>= 7;
+            i++;
+        }
+        bytes[i + offset - 1] = (byte) value;
+        return i;
+    }
+
     /** @return bytes length. */
     public static int encodeInt(DataOutput os, int value) throws IOException {
 
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
index afeacd9b..e3ff57f5 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
@@ -59,7 +59,7 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
 
     // 
----------------------------------------------------------------------------------------------
 
-    private static BinaryArray createArray(int... ints) {
+    public static BinaryArray createArray(int... ints) {
         BinaryArray array = new BinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, ints.length, 
4);
         for (int i = 0; i < ints.length; i++) {
@@ -69,11 +69,11 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
         return array;
     }
 
-    private static BinaryMap createMap(int[] keys, int[] values) {
+    public static BinaryMap createMap(int[] keys, int[] values) {
         return BinaryMap.valueOf(createArray(keys), createArray(values));
     }
 
-    private static GenericRow createRow(Object f0, Object f1, Object f2, 
Object f3, Object f4) {
+    public static GenericRow createRow(Object f0, Object f1, Object f2, Object 
f3, Object f4) {
         GenericRow row = new GenericRow(5);
         row.setField(0, f0);
         row.setField(1, f1);
@@ -83,7 +83,7 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
         return row;
     }
 
-    private static boolean deepEqualsInternalRow(
+    public static boolean deepEqualsInternalRow(
             InternalRow should,
             InternalRow is,
             InternalRowSerializer serializer1,
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializerTest.java
similarity index 50%
copy from 
flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
copy to 
flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializerTest.java
index afeacd9b..3752800b 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/InternalRowSerializerTest.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/serializer/RowCompactedSerializerTest.java
@@ -18,34 +18,27 @@
 
 package org.apache.flink.table.store.data.serializer;
 
-import org.apache.flink.table.store.data.BinaryArray;
-import org.apache.flink.table.store.data.BinaryArrayWriter;
-import org.apache.flink.table.store.data.BinaryMap;
-import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.BinaryString;
 import org.apache.flink.table.store.data.GenericRow;
 import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.types.DataType;
 import org.apache.flink.table.store.types.DataTypes;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Objects;
+import org.apache.flink.table.store.types.RowType;
 
 import static org.apache.flink.table.store.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static 
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.createArray;
+import static 
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.createMap;
+import static 
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.createRow;
+import static 
org.apache.flink.table.store.data.serializer.InternalRowSerializerTest.deepEqualsInternalRow;
 
-/** Test for {@link InternalRowSerializer}. */
-abstract class InternalRowSerializerTest extends 
SerializerTestInstance<InternalRow> {
+/** Test for {@link RowCompactedSerializer}. */
+abstract class RowCompactedSerializerTest extends 
SerializerTestInstance<InternalRow> {
 
-    private final InternalRowSerializer serializer;
-    private final InternalRow[] testData;
+    private final RowCompactedSerializer serializer;
 
-    InternalRowSerializerTest(InternalRowSerializer serializer, InternalRow[] 
testData) {
+    RowCompactedSerializerTest(RowCompactedSerializer serializer, 
InternalRow[] testData) {
         super(serializer, testData);
         this.serializer = serializer;
-        this.testData = testData;
     }
 
     @Override
@@ -53,105 +46,12 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
         return deepEqualsInternalRow(
                 o1,
                 o2,
-                (InternalRowSerializer) serializer.duplicate(),
-                (InternalRowSerializer) serializer.duplicate());
-    }
-
-    // 
----------------------------------------------------------------------------------------------
-
-    private static BinaryArray createArray(int... ints) {
-        BinaryArray array = new BinaryArray();
-        BinaryArrayWriter writer = new BinaryArrayWriter(array, ints.length, 
4);
-        for (int i = 0; i < ints.length; i++) {
-            writer.writeInt(i, ints[i]);
-        }
-        writer.complete();
-        return array;
-    }
-
-    private static BinaryMap createMap(int[] keys, int[] values) {
-        return BinaryMap.valueOf(createArray(keys), createArray(values));
-    }
-
-    private static GenericRow createRow(Object f0, Object f1, Object f2, 
Object f3, Object f4) {
-        GenericRow row = new GenericRow(5);
-        row.setField(0, f0);
-        row.setField(1, f1);
-        row.setField(2, f2);
-        row.setField(3, f3);
-        row.setField(4, f4);
-        return row;
-    }
-
-    private static boolean deepEqualsInternalRow(
-            InternalRow should,
-            InternalRow is,
-            InternalRowSerializer serializer1,
-            InternalRowSerializer serializer2) {
-        return deepEqualsInternalRow(should, is, serializer1, serializer2, 
false);
-    }
-
-    private static boolean deepEqualsInternalRow(
-            InternalRow should,
-            InternalRow is,
-            InternalRowSerializer serializer1,
-            InternalRowSerializer serializer2,
-            boolean checkClass) {
-        if (should.getFieldCount() != is.getFieldCount()) {
-            return false;
-        }
-        if (checkClass && (should.getClass() != is.getClass() || 
!should.equals(is))) {
-            return false;
-        }
-
-        BinaryRow row1 = serializer1.toBinaryRow(should);
-        BinaryRow row2 = serializer2.toBinaryRow(is);
-
-        return Objects.equals(row1, row2);
-    }
-
-    private void checkDeepEquals(InternalRow should, InternalRow is, boolean 
checkClass) {
-        boolean equals =
-                deepEqualsInternalRow(
-                        should,
-                        is,
-                        (InternalRowSerializer) serializer.duplicate(),
-                        (InternalRowSerializer) serializer.duplicate(),
-                        checkClass);
-        assertThat(equals).isTrue();
-    }
-
-    @Test
-    protected void testCopy() {
-        for (InternalRow row : testData) {
-            checkDeepEquals(row, serializer.copy(row), true);
-        }
-
-        for (InternalRow row : testData) {
-            checkDeepEquals(row, serializer.copy(row), true);
-        }
-
-        for (InternalRow row : testData) {
-            checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)), 
false);
-        }
-
-        for (InternalRow row : testData) {
-            checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)), 
false);
-        }
-
-        for (InternalRow row : testData) {
-            checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)), 
false);
-        }
-    }
-
-    @Test
-    void testWrongCopy() {
-        assertThatThrownBy(() -> serializer.copy(new 
GenericRow(serializer.getArity() + 1)))
-                .isInstanceOf(IllegalArgumentException.class);
+                new InternalRowSerializer(serializer.rowType()),
+                new InternalRowSerializer(serializer.rowType()));
     }
 
-    static final class SimpleInternalRowSerializerTest extends 
InternalRowSerializerTest {
-        public SimpleInternalRowSerializerTest() {
+    static final class SimpleTest extends RowCompactedSerializerTest {
+        public SimpleTest() {
             super(getRowSerializer(), getData());
         }
 
@@ -167,13 +67,13 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
             return new InternalRow[] {row1, row2};
         }
 
-        private static InternalRowSerializer getRowSerializer() {
-            return new InternalRowSerializer(DataTypes.INT(), 
DataTypes.STRING());
+        private static RowCompactedSerializer getRowSerializer() {
+            return new RowCompactedSerializer(RowType.of(DataTypes.INT(), 
DataTypes.STRING()));
         }
     }
 
-    static final class LargeInternalRowSerializerTest extends 
InternalRowSerializerTest {
-        public LargeInternalRowSerializerTest() {
+    static final class LargeTest extends RowCompactedSerializerTest {
+        public LargeTest() {
             super(getRowSerializer(), getData());
         }
 
@@ -195,26 +95,27 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
             return new InternalRow[] {row};
         }
 
-        private static InternalRowSerializer getRowSerializer() {
-            return new InternalRowSerializer(
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.INT(),
-                    DataTypes.STRING());
+        private static RowCompactedSerializer getRowSerializer() {
+            return new RowCompactedSerializer(
+                    RowType.of(
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.STRING()));
         }
     }
 
-    static final class InternalRowSerializerWithComplexTypesTest extends 
InternalRowSerializerTest {
-        public InternalRowSerializerWithComplexTypesTest() {
+    static final class ComplexTypesTest extends RowCompactedSerializerTest {
+        public ComplexTypesTest() {
             super(getRowSerializer(), getData());
         }
 
@@ -266,26 +167,26 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
             };
         }
 
-        private static InternalRowSerializer getRowSerializer() {
-            return new InternalRowSerializer(
-                    DataTypes.INT(),
-                    DataTypes.DOUBLE(),
-                    DataTypes.STRING(),
-                    DataTypes.ARRAY(DataTypes.INT()),
-                    DataTypes.MAP(DataTypes.INT(), DataTypes.INT()));
+        private static RowCompactedSerializer getRowSerializer() {
+            return new RowCompactedSerializer(
+                    RowType.of(
+                            DataTypes.INT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.STRING(),
+                            DataTypes.ARRAY(DataTypes.INT()),
+                            DataTypes.MAP(DataTypes.INT(), DataTypes.INT())));
         }
     }
 
-    static final class InternalRowSerializerWithNestedInternalRowTest
-            extends InternalRowSerializerTest {
+    static final class NestedInternalRowTest extends 
RowCompactedSerializerTest {
 
-        private static final DataType NESTED_DATA_TYPE =
+        private static final RowType NESTED_DATA_TYPE =
                 DataTypes.ROW(
                         DataTypes.FIELD(0, "ri", DataTypes.INT()),
                         DataTypes.FIELD(1, "rs", DataTypes.STRING()),
                         DataTypes.FIELD(2, "rb", DataTypes.BIGINT()));
 
-        public InternalRowSerializerWithNestedInternalRowTest() {
+        public NestedInternalRowTest() {
             super(getRowSerializer(), getData());
         }
 
@@ -314,9 +215,8 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
             return new InternalRow[] {nestedRow1, nestedRow2};
         }
 
-        private static InternalRowSerializer getRowSerializer() {
-            return (InternalRowSerializer)
-                    InternalSerializers.<InternalRow>create(NESTED_DATA_TYPE);
+        private static RowCompactedSerializer getRowSerializer() {
+            return new RowCompactedSerializer(NESTED_DATA_TYPE);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
index cc50f718..7b7b8dbc 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
@@ -19,9 +19,8 @@
 package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.table.store.annotation.VisibleForTesting;
-import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
+import org.apache.flink.table.store.data.serializer.RowCompactedSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.io.DataOutputSerializer;
@@ -32,6 +31,7 @@ import org.apache.flink.table.store.memory.MemorySegment;
 import org.apache.flink.table.store.options.MemorySize;
 import org.apache.flink.table.store.reader.RecordReader;
 import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.utils.FileIOUtils;
 import org.apache.flink.table.store.utils.IOFunction;
 
@@ -58,8 +58,8 @@ public class LookupLevels implements Levels.DropFileCallback, 
Closeable {
 
     private final Levels levels;
     private final Comparator<InternalRow> keyComparator;
-    private final InternalRowSerializer keySerializer;
-    private final InternalRowSerializer valueSerializer;
+    private final RowCompactedSerializer keySerializer;
+    private final RowCompactedSerializer valueSerializer;
     private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
     private final Supplier<File> localFileFactory;
     private final LookupStoreFactory lookupStoreFactory;
@@ -69,8 +69,8 @@ public class LookupLevels implements Levels.DropFileCallback, 
Closeable {
     public LookupLevels(
             Levels levels,
             Comparator<InternalRow> keyComparator,
-            InternalRowSerializer keySerializer,
-            InternalRowSerializer valueSerializer,
+            RowType keyType,
+            RowType valueType,
             IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
             Supplier<File> localFileFactory,
             LookupStoreFactory lookupStoreFactory,
@@ -78,8 +78,8 @@ public class LookupLevels implements Levels.DropFileCallback, 
Closeable {
             MemorySize maxDiskSize) {
         this.levels = levels;
         this.keyComparator = keyComparator;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
+        this.keySerializer = new RowCompactedSerializer(keyType);
+        this.valueSerializer = new RowCompactedSerializer(valueType);
         this.fileReaderFactory = fileReaderFactory;
         this.localFileFactory = localFileFactory;
         this.lookupStoreFactory = lookupStoreFactory;
@@ -164,16 +164,14 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
         } catch (ExecutionException e) {
             throw new IOException(e);
         }
-        byte[] keyBytes = keySerializer.toBinaryRow(key).toBytes();
+        byte[] keyBytes = keySerializer.serializeToBytes(key);
         byte[] valueBytes = lookupFile.get(keyBytes);
         if (valueBytes == null) {
             return null;
         }
-        MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
-        long sequenceNumber = memorySegment.getLong(0);
-        RowKind rowKind = RowKind.fromByteValue(valueBytes[8]);
-        BinaryRow value = new BinaryRow(valueSerializer.getArity());
-        value.pointTo(memorySegment, 9, valueBytes.length - 9);
+        InternalRow value = valueSerializer.deserialize(valueBytes);
+        long sequenceNumber = 
MemorySegment.wrap(valueBytes).getLong(valueBytes.length - 9);
+        RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length - 
1]);
         return new KeyValue()
                 .replace(key, sequenceNumber, rowKind, value)
                 .setLevel(lookupFile.remoteFile().level());
@@ -206,11 +204,11 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
             KeyValue kv;
             while ((batch = reader.readBatch()) != null) {
                 while ((kv = batch.next()) != null) {
-                    byte[] keyBytes = 
keySerializer.toBinaryRow(kv.key()).toBytes();
+                    byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
                     valueOut.clear();
+                    
valueOut.write(valueSerializer.serializeToBytes(kv.value()));
                     valueOut.writeLong(kv.sequenceNumber());
                     valueOut.writeByte(kv.valueKind().toByteValue());
-                    
valueOut.write(valueSerializer.toBinaryRow(kv.value()).toBytes());
                     byte[] valueBytes = valueOut.getCopyOfBuffer();
                     kvWriter.put(keyBytes, valueBytes);
                 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 23cbbe6d..97778e29 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.NoopCompactManager;
@@ -240,8 +239,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         return new LookupLevels(
                 levels,
                 keyComparatorSupplier.get(),
-                new InternalRowSerializer(keyType),
-                new InternalRowSerializer(valueType),
+                keyType,
+                valueType,
                 file ->
                         readerFactory.createRecordReader(
                                 file.schemaId(), file.fileName(), 
file.level()),
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
index d68d9e05..39077a13 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree;
 import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.GenericRow;
 import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
 import org.apache.flink.table.store.file.io.DataFileMeta;
@@ -59,6 +58,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.flink.table.store.CoreOptions.TARGET_FILE_SIZE;
+import static org.apache.flink.table.store.file.KeyValue.UNKNOWN_SEQUENCE;
 import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -91,18 +91,21 @@ public class LookupLevelsTest {
         // only in level 1
         KeyValue kv = lookupLevels.lookup(row(1), 1);
         assertThat(kv).isNotNull();
+        assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
         assertThat(kv.level()).isEqualTo(1);
         assertThat(kv.value().getInt(1)).isEqualTo(11);
 
         // only in level 2
         kv = lookupLevels.lookup(row(2), 1);
         assertThat(kv).isNotNull();
+        assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
         assertThat(kv.level()).isEqualTo(2);
         assertThat(kv.value().getInt(1)).isEqualTo(22);
 
         // both in level 1 and level 2
         kv = lookupLevels.lookup(row(5), 1);
         assertThat(kv).isNotNull();
+        assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
         assertThat(kv.level()).isEqualTo(1);
         assertThat(kv.value().getInt(1)).isEqualTo(5);
 
@@ -143,6 +146,7 @@ public class LookupLevelsTest {
         for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
             KeyValue kv = lookupLevels.lookup(row(entry.getKey()), 1);
             assertThat(kv).isNotNull();
+            assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
             assertThat(kv.level()).isEqualTo(1);
             assertThat(kv.value().getInt(1)).isEqualTo(entry.getValue());
         }
@@ -176,6 +180,7 @@ public class LookupLevelsTest {
         for (int i = 0; i < fileNum * recordInFile; i++) {
             KeyValue kv = lookupLevels.lookup(row(i), 1);
             assertThat(kv).isNotNull();
+            assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
             assertThat(kv.level()).isEqualTo(1);
             assertThat(kv.value().getInt(1)).isEqualTo(i);
         }
@@ -195,8 +200,8 @@ public class LookupLevelsTest {
         return new LookupLevels(
                 levels,
                 comparator,
-                new InternalRowSerializer(keyType),
-                new InternalRowSerializer(rowType),
+                keyType,
+                rowType,
                 file -> createReaderFactory().createRecordReader(0, 
file.fileName(), file.level()),
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(new CacheManager(2048, 
MemorySize.ofMebiBytes(1)), 0.75),

Reply via email to