This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 130967a4db392d5068f1fbab78d113e52f320353
Author: Jark Wu <[email protected]>
AuthorDate: Wed Apr 29 11:19:29 2020 +0800

    [FLINK-16996][table-common] Add binary implementations of internal data 
structures
    
    This closes #11925
---
 .../flink/table/data/binary/BinaryArrayData.java   |  559 +++++++++
 .../flink/table/data/binary/BinaryFormat.java      |   73 ++
 .../flink/table/data/binary/BinaryMapData.java     |  120 ++
 .../table/data/binary/BinaryRawValueData.java      |  107 ++
 .../flink/table/data/binary/BinaryRowData.java     |  438 +++++++
 .../flink/table/data/binary/BinarySection.java     |   75 ++
 .../table/data/binary/BinarySegmentUtils.java      | 1198 ++++++++++++++++++++
 .../flink/table/data/binary/BinaryStringData.java  |  856 ++++++++++++++
 .../flink/table/data/binary/LazyBinaryFormat.java  |  142 +++
 .../flink/table/data/binary/MurmurHashUtils.java   |  175 +++
 .../flink/table/data/binary/NestedRowData.java     |  334 ++++++
 .../flink/table/data/binary/StringUtf8Utils.java   |  306 +++++
 .../flink/table/data/binary/TypedSetters.java      |   67 ++
 13 files changed, 4450 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
new file mode 100644
index 0000000..30b94e9
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.lang.reflect.Array;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+
+/**
+ * A binary implementation of {@link ArrayData} which is backed by {@link 
MemorySegment}s.
+ *
+ * <p>For fields that hold fixed-length primitive types, such as long, double 
or int, they are
+ * stored compacted in bytes, just like the original java array.
+ *
+ * <p>The binary layout of {@link BinaryArrayData}:
+ *
+ * <pre>
+ * [size(int)] + [null bits(4-byte word boundaries)] + [values or 
offset&length] + [variable length part].
+ * </pre>
+ */
+@Internal
+public final class BinaryArrayData extends BinarySection implements ArrayData, 
TypedSetters {
+
+       /**
+        * Offset for Arrays.
+        */
+       private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+       private static final int BOOLEAN_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(boolean[].class);
+       private static final int SHORT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(short[].class);
+       private static final int INT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(int[].class);
+       private static final int LONG_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(long[].class);
+       private static final int FLOAT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(float[].class);
+       private static final int DOUBLE_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(double[].class);
+
+       public static int calculateHeaderInBytes(int numFields) {
+               return 4 + ((numFields + 31) / 32) * 4;
+       }
+
+       /**
+        * It store real value when type is primitive.
+        * It store the length and offset of variable-length part when type is 
string, map, etc.
+        */
+       public static int calculateFixLengthPartSize(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case BOOLEAN:
+                       case TINYINT:
+                               return 1;
+                       case SMALLINT:
+                               return 2;
+                       case INTEGER:
+                       case FLOAT:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                               return 4;
+                       default:
+                               // long, double is 8 bytes.
+                               // It store the length and offset of 
variable-length part when type is string, map, etc.
+                               return 8;
+               }
+       }
+
+       // The number of elements in this array
+       private int size;
+
+       /** The position to start storing array elements. */
+       private int elementOffset;
+
+       public BinaryArrayData() {}
+
+       private void assertIndexIsValid(int ordinal) {
+               assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
+               assert ordinal < size : "ordinal (" + ordinal + ") should < " + 
size;
+       }
+
+       private int getElementOffset(int ordinal, int elementSize) {
+               return elementOffset + ordinal * elementSize;
+       }
+
+       @Override
+       public int size() {
+               return size;
+       }
+
+       @Override
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               // Read the number of elements from the first 4 bytes.
+               final int size = BinarySegmentUtils.getInt(segments, offset);
+               assert size >= 0 : "size (" + size + ") should >= 0";
+
+               this.size = size;
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+               this.elementOffset = offset + calculateHeaderInBytes(this.size);
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.bitGet(segments, offset + 4, pos);
+       }
+
+       public void setNullAt(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+       }
+
+       public void setNotNullAt(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitUnSet(segments, offset + 4, pos);
+       }
+
+       @Override
+       public long getLong(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getLong(segments, 
getElementOffset(pos, 8));
+       }
+
+       public void setLong(int pos, long value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setLong(segments, getElementOffset(pos, 8), 
value);
+       }
+
+       public void setNullLong(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setLong(segments, getElementOffset(pos, 8), 
0L);
+       }
+
+       @Override
+       public int getInt(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getInt(segments, 
getElementOffset(pos, 4));
+       }
+
+       public void setInt(int pos, int value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setInt(segments, getElementOffset(pos, 4), 
value);
+       }
+
+       public void setNullInt(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setInt(segments, getElementOffset(pos, 4), 
0);
+       }
+
+       @Override
+       public StringData getString(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readStringData(
+                       segments, offset, fieldOffset, offsetAndSize);
+       }
+
+       @Override
+       public DecimalData getDecimal(int pos, int precision, int scale) {
+               assertIndexIsValid(pos);
+               if (DecimalData.isCompact(precision)) {
+                       return DecimalData.fromUnscaledLong(
+                               BinarySegmentUtils.getLong(segments, 
getElementOffset(pos, 8)),
+                               precision,
+                               scale);
+               }
+
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readDecimalData(segments, offset, 
offsetAndSize, precision, scale);
+       }
+
+       @Override
+       public TimestampData getTimestamp(int pos, int precision) {
+               assertIndexIsValid(pos);
+
+               if (TimestampData.isCompact(precision)) {
+                       return TimestampData.fromEpochMillis(
+                               BinarySegmentUtils.getLong(segments, 
getElementOffset(pos, 8)));
+               }
+
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndNanoOfMilli = 
BinarySegmentUtils.getLong(segments, fieldOffset);
+               return BinarySegmentUtils.readTimestampData(segments, offset, 
offsetAndNanoOfMilli);
+       }
+
+       @Override
+       public <T> RawValueData<T> getRawValue(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readRawValueData(segments, offset, 
offsetAndSize);
+       }
+
+       @Override
+       public byte[] getBinary(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readBinary(
+                       segments, offset, fieldOffset, offsetAndSize);
+       }
+
+       @Override
+       public ArrayData getArray(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readArrayData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public MapData getMap(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readMapData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public RowData getRow(int pos, int numFields) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readRowData(segments, numFields, 
offset, offsetAndSize);
+       }
+
+       @Override
+       public boolean getBoolean(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getBoolean(segments, 
getElementOffset(pos, 1));
+       }
+
+       public void setBoolean(int pos, boolean value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setBoolean(segments, getElementOffset(pos, 
1), value);
+       }
+
+       public void setNullBoolean(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setBoolean(segments, getElementOffset(pos, 
1), false);
+       }
+
+       @Override
+       public byte getByte(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getByte(segments, 
getElementOffset(pos, 1));
+       }
+
+       public void setByte(int pos, byte value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setByte(segments, getElementOffset(pos, 1), 
value);
+       }
+
+       public void setNullByte(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setByte(segments, getElementOffset(pos, 1), 
(byte) 0);
+       }
+
+       @Override
+       public short getShort(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getShort(segments, 
getElementOffset(pos, 2));
+       }
+
+       public void setShort(int pos, short value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setShort(segments, getElementOffset(pos, 2), 
value);
+       }
+
+       public void setNullShort(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setShort(segments, getElementOffset(pos, 2), 
(short) 0);
+       }
+
+       @Override
+       public float getFloat(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getFloat(segments, 
getElementOffset(pos, 4));
+       }
+
+       public void setFloat(int pos, float value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setFloat(segments, getElementOffset(pos, 4), 
value);
+       }
+
+       public void setNullFloat(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setFloat(segments, getElementOffset(pos, 4), 
0F);
+       }
+
+       @Override
+       public double getDouble(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getDouble(segments, 
getElementOffset(pos, 8));
+       }
+
+       public void setDouble(int pos, double value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setDouble(segments, getElementOffset(pos, 
8), value);
+       }
+
+       public void setNullDouble(int pos) {
+               assertIndexIsValid(pos);
+               BinarySegmentUtils.bitSet(segments, offset + 4, pos);
+               BinarySegmentUtils.setDouble(segments, getElementOffset(pos, 
8), 0.0);
+       }
+
+       public void setDecimal(int pos, DecimalData value, int precision) {
+               assertIndexIsValid(pos);
+
+               if (DecimalData.isCompact(precision)) {
+                       // compact format
+                       setLong(pos, value.toUnscaledLong());
+               } else {
+                       int fieldOffset = getElementOffset(pos, 8);
+                       int cursor = (int) 
(BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
+                       assert cursor > 0 : "invalid cursor " + cursor;
+                       // zero-out the bytes
+                       BinarySegmentUtils.setLong(segments, offset + cursor, 
0L);
+                       BinarySegmentUtils.setLong(segments, offset + cursor + 
8, 0L);
+
+                       if (value == null) {
+                               setNullAt(pos);
+                               // keep the offset for future update
+                               BinarySegmentUtils.setLong(segments, 
fieldOffset, ((long) cursor) << 32);
+                       } else {
+
+                               byte[] bytes = value.toUnscaledBytes();
+                               assert (bytes.length <= 16);
+
+                               // Write the bytes to the variable length 
portion.
+                               BinarySegmentUtils.copyFromBytes(segments, 
offset + cursor, bytes, 0, bytes.length);
+                               setLong(pos, ((long) cursor << 32) | ((long) 
bytes.length));
+                       }
+               }
+       }
+
+       public void setTimestamp(int pos, TimestampData value, int precision) {
+               assertIndexIsValid(pos);
+
+               if (TimestampData.isCompact(precision)) {
+                       setLong(pos, value.getMillisecond());
+               } else {
+                       int fieldOffset = getElementOffset(pos, 8);
+                       int cursor = (int) 
(BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
+                       assert cursor > 0 : "invalid cursor " + cursor;
+
+                       if (value == null) {
+                               setNullAt(pos);
+                               // zero-out the bytes
+                               BinarySegmentUtils.setLong(segments, offset + 
cursor, 0L);
+                               // keep the offset for future update
+                               BinarySegmentUtils.setLong(segments, 
fieldOffset, ((long) cursor) << 32);
+                       } else {
+                               // write millisecond to the variable length 
portion.
+                               BinarySegmentUtils.setLong(segments, offset + 
cursor, value.getMillisecond());
+                               // write nanoOfMillisecond to the fixed-length 
portion.
+                               setLong(pos, ((long) cursor << 32) | (long) 
value.getNanoOfMillisecond());
+                       }
+               }
+       }
+
+       public boolean anyNull() {
+               for (int i = offset + 4; i < elementOffset; i += 4) {
+                       if (BinarySegmentUtils.getInt(segments, i) != 0) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private void checkNoNull() {
+               if (anyNull()) {
+                       throw new RuntimeException("Array can not have null 
value!");
+               }
+       }
+
+       @Override
+       public boolean[] toBooleanArray() {
+               checkNoNull();
+               boolean[] values = new boolean[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, BOOLEAN_ARRAY_OFFSET, 
size);
+               return values;
+       }
+
+       @Override
+       public byte[] toByteArray() {
+               checkNoNull();
+               byte[] values = new byte[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, 
BYTE_ARRAY_BASE_OFFSET, size);
+               return values;
+       }
+
+       @Override
+       public short[] toShortArray() {
+               checkNoNull();
+               short[] values = new short[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, SHORT_ARRAY_OFFSET, 
size * 2);
+               return values;
+       }
+
+       @Override
+       public int[] toIntArray() {
+               checkNoNull();
+               int[] values = new int[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, INT_ARRAY_OFFSET, size 
* 4);
+               return values;
+       }
+
+       @Override
+       public long[] toLongArray() {
+               checkNoNull();
+               long[] values = new long[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, LONG_ARRAY_OFFSET, 
size * 8);
+               return values;
+       }
+
+       @Override
+       public float[] toFloatArray() {
+               checkNoNull();
+               float[] values = new float[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, FLOAT_ARRAY_OFFSET, 
size * 4);
+               return values;
+       }
+
+       @Override
+       public double[] toDoubleArray() {
+               checkNoNull();
+               double[] values = new double[size];
+               BinarySegmentUtils.copyToUnsafe(
+                       segments, elementOffset, values, DOUBLE_ARRAY_OFFSET, 
size * 8);
+               return values;
+       }
+
+       @SuppressWarnings("unchecked")
+       public <T> T[] toObjectArray(LogicalType elementType) {
+               Class<T> elementClass = (Class<T>) 
LogicalTypeUtils.toInternalConversionClass(elementType);
+               T[] values = (T[]) Array.newInstance(elementClass, size);
+               for (int i = 0; i < size; i++) {
+                       if (!isNullAt(i)) {
+                               values[i] = (T) ArrayData.get(this, i, 
elementType);
+                       }
+               }
+               return values;
+       }
+
+       public BinaryArrayData copy() {
+               return copy(new BinaryArrayData());
+       }
+
+       public BinaryArrayData copy(BinaryArrayData reuse) {
+               byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+
+       @Override
+       public int hashCode() {
+               return BinarySegmentUtils.hashByWords(segments, offset, 
sizeInBytes);
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Construction Utilities
+       // 
------------------------------------------------------------------------------------------
+
+       public static BinaryArrayData fromPrimitiveArray(boolean[] arr) {
+               return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, 
arr.length, 1);
+       }
+
+       public static BinaryArrayData fromPrimitiveArray(byte[] arr) {
+               return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, 
arr.length, 1);
+       }
+
+       public static BinaryArrayData fromPrimitiveArray(short[] arr) {
+               return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 
2);
+       }
+
+       public static BinaryArrayData fromPrimitiveArray(int[] arr) {
+               return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
+       }
+
+       public static BinaryArrayData fromPrimitiveArray(long[] arr) {
+               return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 
8);
+       }
+
+       public static BinaryArrayData fromPrimitiveArray(float[] arr) {
+               return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 
4);
+       }
+
+       public static BinaryArrayData fromPrimitiveArray(double[] arr) {
+               return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 
8);
+       }
+
+       private static BinaryArrayData fromPrimitiveArray(
+               Object arr, int offset, int length, int elementSize) {
+               final long headerInBytes = calculateHeaderInBytes(length);
+               final long valueRegionInBytes = elementSize * length;
+
+               // must align by 8 bytes
+               long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 
7) / 8;
+               if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
+                       throw new UnsupportedOperationException("Cannot convert 
this array to unsafe format as " +
+                               "it's too big.");
+               }
+               long totalSize = totalSizeInLongs * 8;
+
+               final byte[] data = new byte[(int) totalSize];
+
+               UNSAFE.putInt(data, (long) BYTE_ARRAY_BASE_OFFSET, length);
+               UNSAFE.copyMemory(
+                       arr, offset, data, BYTE_ARRAY_BASE_OFFSET + 
headerInBytes, valueRegionInBytes);
+
+               BinaryArrayData result = new BinaryArrayData();
+               result.pointTo(MemorySegmentFactory.wrap(data), 0, (int) 
totalSize);
+               return result;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryFormat.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryFormat.java
new file mode 100644
index 0000000..65affe7
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Binary format spanning {@link MemorySegment}s.
+ */
+@Internal
+public interface BinaryFormat {
+
+       /**
+        * It decides whether to put data in FixLenPart or VarLenPart. See more 
in {@link BinaryRowData}.
+        *
+        * <p>If len is less than 8, its binary format is:
+        * 1-bit mark(1) = 1, 7-bits len, and 7-bytes data.
+        * Data is stored in fix-length part.
+        *
+        * <p>If len is greater or equal to 8, its binary format is:
+        * 1-bit mark(1) = 0, 31-bits offset to the data, and 4-bytes length of 
data.
+        * Data is stored in variable-length part.
+        */
+       int MAX_FIX_PART_DATA_SIZE = 7;
+       /**
+        * To get the mark in highest bit of long.
+        * Form: 10000000 00000000 ... (8 bytes)
+        *
+        * <p>This is used to decide whether the data is stored in fixed-length 
part or variable-length
+        * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
+        */
+       long HIGHEST_FIRST_BIT = 0x80L << 56;
+       /**
+        * To get the 7 bits length in second bit to eighth bit out of a long.
+        * Form: 01111111 00000000 ... (8 bytes)
+        *
+        * <p>This is used to get the length of the data which is stored in 
this long.
+        * see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
+        */
+       long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
+
+       /**
+        * Gets the underlying {@link MemorySegment}s this binary format spans.
+        */
+       MemorySegment[] getSegments();
+
+       /**
+        * Gets the start offset of this binary data in the {@link 
MemorySegment}s.
+        */
+       int getOffset();
+
+       /**
+        * Gets the size in bytes of this binary data.
+        */
+       int getSizeInBytes();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java
new file mode 100644
index 0000000..fdad44c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ *
+ * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
+ */
+@Internal
+public final class BinaryMapData extends BinarySection implements MapData {
+
+       private final BinaryArrayData keys;
+       private final BinaryArrayData values;
+
+       public BinaryMapData() {
+               keys = new BinaryArrayData();
+               values = new BinaryArrayData();
+       }
+
+       public int size() {
+               return keys.size();
+       }
+
+       @Override
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               // Read the numBytes of key array from the first 4 bytes.
+               final int keyArrayBytes = BinarySegmentUtils.getInt(segments, 
offset);
+               assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + 
") should >= 0";
+               final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
+               assert valueArrayBytes >= 0 : "valueArraySize (" + 
valueArrayBytes + ") should >= 0";
+
+               keys.pointTo(segments, offset + 4, keyArrayBytes);
+               values.pointTo(segments, offset + 4 + keyArrayBytes, 
valueArrayBytes);
+
+               assert keys.size() == values.size();
+
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public BinaryArrayData keyArray() {
+               return keys;
+       }
+
+       public BinaryArrayData valueArray() {
+               return values;
+       }
+
+       public Map<?, ?> toJavaMap(LogicalType keyType, LogicalType valueType) {
+               Object[] keyArray = keys.toObjectArray(keyType);
+               Object[] valueArray = values.toObjectArray(valueType);
+
+               Map<Object, Object> map = new HashMap<>();
+               for (int i = 0; i < keyArray.length; i++) {
+                       map.put(keyArray[i], valueArray[i]);
+               }
+               return map;
+       }
+
+       public BinaryMapData copy() {
+               return copy(new BinaryMapData());
+       }
+
+       public BinaryMapData copy(BinaryMapData reuse) {
+               byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+
+       @Override
+       public int hashCode() {
+               return BinarySegmentUtils.hashByWords(segments, offset, 
sizeInBytes);
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Construction Utilities
+       // 
------------------------------------------------------------------------------------------
+
+       public static BinaryMapData valueOf(BinaryArrayData key, 
BinaryArrayData value) {
+               checkArgument(key.segments.length == 1 && 
value.getSegments().length == 1);
+               byte[] bytes = new byte[4 + key.sizeInBytes + 
value.sizeInBytes];
+               MemorySegment segment = MemorySegmentFactory.wrap(bytes);
+               segment.putInt(0, key.sizeInBytes);
+               key.getSegments()[0].copyTo(key.getOffset(), segment, 4, 
key.sizeInBytes);
+               value.getSegments()[0].copyTo(
+                       value.getOffset(), segment, 4 + key.sizeInBytes, 
value.sizeInBytes);
+               BinaryMapData map = new BinaryMapData();
+               map.pointTo(segment, 0, bytes.length);
+               return map;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
new file mode 100644
index 0000000..ff5921b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * A lazily binary implementation of {@link RawValueData} which is backed by 
{@link MemorySegment}s
+ * and generic {@link Object}.
+ *
+ * <p>Either {@link MemorySegment}s or {@link Object} must be provided when
+ * constructing {@link BinaryRawValueData}. The other representation will be 
materialized when needed.
+ *
+ * @param <T> the java type of the raw value.
+ */
+@Internal
+public final class BinaryRawValueData<T> extends LazyBinaryFormat<T> 
implements RawValueData<T> {
+
+       public BinaryRawValueData(T javaObject) {
+               super(javaObject);
+       }
+
+       public BinaryRawValueData(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               super(segments, offset, sizeInBytes);
+       }
+
+       public BinaryRawValueData(MemorySegment[] segments, int offset, int 
sizeInBytes, T javaObject) {
+               super(segments, offset, sizeInBytes, javaObject);
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Public Interfaces
+       // 
------------------------------------------------------------------------------------------
+
+       @Override
+       public T toObject(TypeSerializer<T> serializer) {
+               if (javaObject == null) {
+                       try {
+                               javaObject = 
InstantiationUtil.deserializeFromByteArray(
+                                       serializer,
+                                       toBytes(serializer));
+                       } catch (IOException e) {
+                               throw new FlinkRuntimeException(e);
+                       }
+               }
+               return javaObject;
+       }
+
+       @Override
+       public byte[] toBytes(TypeSerializer<T> serializer) {
+               ensureMaterialized(serializer);
+               return BinarySegmentUtils.copyToBytes(getSegments(), 
getOffset(), getSizeInBytes());
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               throw new UnsupportedOperationException("BinaryRawValueData 
cannot be compared");
+       }
+
+       @Override
+       public int hashCode() {
+               throw new UnsupportedOperationException("BinaryRawValueData 
does not have a hashCode");
+       }
+
+       @Override
+       public String toString() {
+               return String.format("SqlRawValue{%s}", javaObject == null ? 
"?" : javaObject);
+       }
+
+       // 
------------------------------------------------------------------------------------
+       // Internal methods
+       // 
------------------------------------------------------------------------------------
+
+       @Override
+       protected BinarySection materialize(TypeSerializer<T> serializer) {
+               try {
+                       byte[] bytes = 
InstantiationUtil.serializeToByteArray(serializer, javaObject);
+                       return new BinarySection(new MemorySegment[] 
{MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java
new file mode 100644
index 0000000..d2ad35c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.types.RowKind;
+
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of {@link RowData} which is backed by {@link 
MemorySegment} instead of Object.
+ * It can significantly reduce the serialization/deserialization of Java 
objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains 1 byte header and null bit set and field 
values. Null bit set is
+ * used for null tracking and is aligned to 8-byte word boundaries. `Field 
values` holds
+ * fixed-length primitive types and variable-length values which can be stored 
in 8 bytes inside.
+ * If it do not fit the variable-length field, then store the length and 
offset of variable-length
+ * part.
+ *
+ * <p>Fixed-length part will certainly fall into a MemorySegment, which will 
speed up the read
+ * and write of field. During the write phase, if the target memory segment 
has less space than
+ * fixed length part size, we will skip the space. So the number of fields in 
a single Row cannot
+ * exceed the capacity of a single MemorySegment, if there are too many 
fields, we suggest that
+ * user set a bigger pageSize of MemorySegment.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ */
+@Internal
+public final class BinaryRowData extends BinarySection implements RowData, 
TypedSetters {
+
+       public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN);
+       private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : 
~(0xFFL << 56L);
+       public static final int HEADER_SIZE_IN_BITS = 8;
+
+       public static int calculateBitSetWidthInBytes(int arity) {
+               return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
+       }
+
+       public static int calculateFixPartSizeInBytes(int arity) {
+               return calculateBitSetWidthInBytes(arity) + 8 * arity;
+       }
+
+       /**
+        * If it is a fixed-length field, we can call this BinaryRowData's 
setXX method for in-place updates.
+        * If it is variable-length field, can't use this method, because the 
underlying data is stored continuously.
+        */
+       public static boolean isInFixedLengthPart(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case BOOLEAN:
+                       case TINYINT:
+                       case SMALLINT:
+                       case INTEGER:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                       case INTERVAL_YEAR_MONTH:
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                       case FLOAT:
+                       case DOUBLE:
+                               return true;
+                       case DECIMAL:
+                               return DecimalData.isCompact(((DecimalType) 
type).getPrecision());
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return TimestampData.isCompact(((TimestampType) 
type).getPrecision());
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                               return 
TimestampData.isCompact(((LocalZonedTimestampType) type).getPrecision());
+                       default:
+                               return false;
+               }
+       }
+
+       public static boolean isMutable(LogicalType type) {
+               return isInFixedLengthPart(type) || type.getTypeRoot() == 
LogicalTypeRoot.DECIMAL;
+       }
+
+       private final int arity;
+       private final int nullBitsSizeInBytes;
+
+       public BinaryRowData(int arity) {
+               checkArgument(arity >= 0);
+               this.arity = arity;
+               this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+       }
+
+       private int getFieldOffset(int pos) {
+               return offset + nullBitsSizeInBytes + pos * 8;
+       }
+
+       private void assertIndexIsValid(int index) {
+               assert index >= 0 : "index (" + index + ") should >= 0";
+               assert index < arity : "index (" + index + ") should < " + 
arity;
+       }
+
+       public int getFixedLengthPartSize() {
+               return nullBitsSizeInBytes + 8 * arity;
+       }
+
+       @Override
+       public int getArity() {
+               return arity;
+       }
+
+       @Override
+       public RowKind getRowKind() {
+               byte kindValue = segments[0].get(offset);
+               return RowKind.fromByteValue(kindValue);
+       }
+
+       @Override
+       public void setRowKind(RowKind kind) {
+               segments[0].put(offset, kind.toByteValue());
+       }
+
+       public void setTotalSize(int sizeInBytes) {
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.bitGet(segments[0], offset, pos + 
HEADER_SIZE_IN_BITS);
+       }
+
+       private void setNotNullAt(int i) {
+               assertIndexIsValid(i);
+               BinarySegmentUtils.bitUnSet(segments[0], offset, i + 
HEADER_SIZE_IN_BITS);
+       }
+
+       @Override
+       public void setNullAt(int i) {
+               assertIndexIsValid(i);
+               BinarySegmentUtils.bitSet(segments[0], offset, i + 
HEADER_SIZE_IN_BITS);
+               // We must set the fixed length part zero.
+               // 1.Only int/long/boolean...(Fix length type) will invoke this 
setNullAt.
+               // 2.Set to zero in order to equals and hash operation bytes 
calculation.
+               segments[0].putLong(getFieldOffset(i), 0);
+       }
+
+       @Override
+       public void setInt(int pos, int value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putInt(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setLong(int pos, long value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putLong(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setDouble(int pos, double value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putDouble(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setDecimal(int pos, DecimalData value, int precision) {
+               assertIndexIsValid(pos);
+
+               if (DecimalData.isCompact(precision)) {
+                       // compact format
+                       setLong(pos, value.toUnscaledLong());
+               } else {
+                       int fieldOffset = getFieldOffset(pos);
+                       int cursor = (int) (segments[0].getLong(fieldOffset) 
>>> 32);
+                       assert cursor > 0 : "invalid cursor " + cursor;
+                       // zero-out the bytes
+                       BinarySegmentUtils.setLong(segments, offset + cursor, 
0L);
+                       BinarySegmentUtils.setLong(segments, offset + cursor + 
8, 0L);
+
+                       if (value == null) {
+                               setNullAt(pos);
+                               // keep the offset for future update
+                               segments[0].putLong(fieldOffset, ((long) 
cursor) << 32);
+                       } else {
+
+                               byte[] bytes = value.toUnscaledBytes();
+                               assert bytes.length <= 16;
+
+                               // Write the bytes to the variable length 
portion.
+                               BinarySegmentUtils.copyFromBytes(segments, 
offset + cursor, bytes, 0, bytes.length);
+                               setLong(pos, ((long) cursor << 32) | ((long) 
bytes.length));
+                       }
+               }
+       }
+
+       @Override
+       public void setTimestamp(int pos, TimestampData value, int precision) {
+               assertIndexIsValid(pos);
+
+               if (TimestampData.isCompact(precision)) {
+                       setLong(pos, value.getMillisecond());
+               } else {
+                       int fieldOffset = getFieldOffset(pos);
+                       int cursor = (int) (segments[0].getLong(fieldOffset) 
>>> 32);
+                       assert cursor > 0 : "invalid cursor " + cursor;
+
+                       if (value == null) {
+                               setNullAt(pos);
+                               // zero-out the bytes
+                               BinarySegmentUtils.setLong(segments, offset + 
cursor, 0L);
+                               // keep the offset for future update
+                               segments[0].putLong(fieldOffset, ((long) 
cursor) << 32);
+                       } else {
+                               // write millisecond to the variable length 
portion.
+                               BinarySegmentUtils.setLong(segments, offset + 
cursor, value.getMillisecond());
+                               // write nanoOfMillisecond to the fixed-length 
portion.
+                               setLong(pos, ((long) cursor << 32) | (long) 
value.getNanoOfMillisecond());
+                       }
+               }
+       }
+
+       @Override
+       public void setBoolean(int pos, boolean value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putBoolean(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setShort(int pos, short value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putShort(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setByte(int pos, byte value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].put(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setFloat(int pos, float value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putFloat(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public boolean getBoolean(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getBoolean(getFieldOffset(pos));
+       }
+
+       @Override
+       public byte getByte(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].get(getFieldOffset(pos));
+       }
+
+       @Override
+       public short getShort(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getShort(getFieldOffset(pos));
+       }
+
+       @Override
+       public int getInt(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getInt(getFieldOffset(pos));
+       }
+
+       @Override
+       public long getLong(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getLong(getFieldOffset(pos));
+       }
+
+       @Override
+       public float getFloat(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getFloat(getFieldOffset(pos));
+       }
+
+       @Override
+       public double getDouble(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getDouble(getFieldOffset(pos));
+       }
+
+       @Override
+       public StringData getString(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               return BinarySegmentUtils.readStringData(segments, offset, 
fieldOffset, offsetAndLen);
+       }
+
+       @Override
+       public DecimalData getDecimal(int pos, int precision, int scale) {
+               assertIndexIsValid(pos);
+
+               if (DecimalData.isCompact(precision)) {
+                       return DecimalData.fromUnscaledLong(
+                               segments[0].getLong(getFieldOffset(pos)),
+                               precision,
+                               scale);
+               }
+
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndSize = segments[0].getLong(fieldOffset);
+               return BinarySegmentUtils.readDecimalData(segments, offset, 
offsetAndSize, precision, scale);
+       }
+
+       @Override
+       public TimestampData getTimestamp(int pos, int precision) {
+               assertIndexIsValid(pos);
+
+               if (TimestampData.isCompact(precision)) {
+                       return 
TimestampData.fromEpochMillis(segments[0].getLong(getFieldOffset(pos)));
+               }
+
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndNanoOfMilli = 
segments[0].getLong(fieldOffset);
+               return BinarySegmentUtils.readTimestampData(segments, offset, 
offsetAndNanoOfMilli);
+       }
+
+       @Override
+       public <T> RawValueData<T> getRawValue(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readRawValueData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public byte[] getBinary(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               return BinarySegmentUtils.readBinary(segments, offset, 
fieldOffset, offsetAndLen);
+       }
+
+       @Override
+       public ArrayData getArray(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readArrayData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public MapData getMap(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readMapData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public RowData getRow(int pos, int numFields) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readRowData(segments, numFields, 
offset, getLong(pos));
+       }
+
+       /**
+        * The bit is 1 when the field is null. Default is 0.
+        */
+       public boolean anyNull() {
+               // Skip the header.
+               if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) {
+                       return true;
+               }
+               for (int i = 8; i < nullBitsSizeInBytes; i += 8) {
+                       if (segments[0].getLong(i) != 0) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public boolean anyNull(int[] fields) {
+               for (int field : fields) {
+                       if (isNullAt(field)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public BinaryRowData copy() {
+               return copy(new BinaryRowData(arity));
+       }
+
+       public BinaryRowData copy(BinaryRowData reuse) {
+               return copyInternal(reuse);
+       }
+
+       private BinaryRowData copyInternal(BinaryRowData reuse) {
+               byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+
+       public void clear() {
+               segments = null;
+               offset = 0;
+               sizeInBytes = 0;
+       }
+
+       @Override
+       public int hashCode() {
+               return BinarySegmentUtils.hashByWords(segments, offset, 
sizeInBytes);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySection.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySection.java
new file mode 100644
index 0000000..a2312b8
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySection.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A basic implementation of {@link BinaryFormat} which describe a section of 
memory.
+ */
+@Internal
+public class BinarySection implements BinaryFormat {
+
+       protected MemorySegment[] segments;
+       protected int offset;
+       protected int sizeInBytes;
+
+       public BinarySection() {}
+
+       public BinarySection(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public final void pointTo(MemorySegment segment, int offset, int 
sizeInBytes) {
+               pointTo(new MemorySegment[] {segment}, offset, sizeInBytes);
+       }
+
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public MemorySegment[] getSegments() {
+               return segments;
+       }
+
+       public int getOffset() {
+               return offset;
+       }
+
+       public int getSizeInBytes() {
+               return sizeInBytes;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null &&
+                       getClass() == o.getClass() &&
+                       sizeInBytes == ((BinarySection) o).sizeInBytes &&
+                       BinarySegmentUtils.equals(segments, offset, 
((BinarySection) o).segments, ((BinarySection) o).offset, sizeInBytes);
+       }
+
+       @Override
+       public int hashCode() {
+               return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySegmentUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySegmentUtils.java
new file mode 100644
index 0000000..e777bba
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySegmentUtils.java
@@ -0,0 +1,1198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import java.io.IOException;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+import static 
org.apache.flink.table.data.binary.BinaryFormat.HIGHEST_FIRST_BIT;
+import static 
org.apache.flink.table.data.binary.BinaryFormat.HIGHEST_SECOND_TO_EIGHTH_BIT;
+
+/**
+ * Utilities for binary data segments which heavily uses {@link MemorySegment}.
+ */
+@Internal
+public final class BinarySegmentUtils {
+
+       /**
+        * Constant that flags the byte order.
+        */
+       public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN;
+
+       private static final int ADDRESS_BITS_PER_WORD = 3;
+
+       private static final int BIT_BYTE_INDEX_MASK = 7;
+
+       /**
+        * SQL execution threads is limited, not too many, so it can bear the 
overhead of 64K per thread.
+        */
+       private static final int MAX_BYTES_LENGTH = 1024 * 64;
+       private static final int MAX_CHARS_LENGTH = 1024 * 32;
+
+       private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
+       private static final ThreadLocal<byte[]> BYTES_LOCAL = new 
ThreadLocal<>();
+       private static final ThreadLocal<char[]> CHARS_LOCAL = new 
ThreadLocal<>();
+
+       private BinarySegmentUtils() {
+               // do not instantiate
+       }
+
+       /**
+        * Allocate bytes that is only for temporary usage, it should not be 
stored in somewhere else.
+        * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] 
new and gc.
+        *
+        * <p>If there are methods that can only accept a byte[], instead of a 
MemorySegment[]
+        * parameter, we can allocate a reuse bytes and copy the MemorySegment 
data to byte[],
+        * then call the method. Such as String deserialization.
+        */
+       public static byte[] allocateReuseBytes(int length) {
+               byte[] bytes = BYTES_LOCAL.get();
+
+               if (bytes == null) {
+                       if (length <= MAX_BYTES_LENGTH) {
+                               bytes = new byte[MAX_BYTES_LENGTH];
+                               BYTES_LOCAL.set(bytes);
+                       } else {
+                               bytes = new byte[length];
+                       }
+               } else if (bytes.length < length) {
+                       bytes = new byte[length];
+               }
+
+               return bytes;
+       }
+
+       public static char[] allocateReuseChars(int length) {
+               char[] chars = CHARS_LOCAL.get();
+
+               if (chars == null) {
+                       if (length <= MAX_CHARS_LENGTH) {
+                               chars = new char[MAX_CHARS_LENGTH];
+                               CHARS_LOCAL.set(chars);
+                       } else {
+                               chars = new char[length];
+                       }
+               } else if (chars.length < length) {
+                       chars = new char[length];
+               }
+
+               return chars;
+       }
+
+       /**
+        * Copy segments to a new byte[].
+        *
+        * @param segments Source segments.
+        * @param offset Source segments offset.
+        * @param numBytes the number bytes to copy.
+        */
+       public static byte[] copyToBytes(MemorySegment[] segments, int offset, 
int numBytes) {
+               return copyToBytes(segments, offset, new byte[numBytes], 0, 
numBytes);
+       }
+
+       /**
+        * Copy segments to target byte[].
+        *
+        * @param segments Source segments.
+        * @param offset Source segments offset.
+        * @param bytes target byte[].
+        * @param bytesOffset target byte[] offset.
+        * @param numBytes the number bytes to copy.
+        */
+       public static byte[] copyToBytes(
+                       MemorySegment[] segments,
+                       int offset,
+                       byte[] bytes,
+                       int bytesOffset,
+                       int numBytes) {
+               if (inFirstSegment(segments, offset, numBytes)) {
+                       segments[0].get(offset, bytes, bytesOffset, numBytes);
+               } else {
+                       copyMultiSegmentsToBytes(segments, offset, bytes, 
bytesOffset, numBytes);
+               }
+               return bytes;
+       }
+
+       public static void copyMultiSegmentsToBytes(
+                       MemorySegment[] segments,
+                       int offset,
+                       byte[] bytes,
+                       int bytesOffset,
+                       int numBytes) {
+               int remainSize = numBytes;
+               for (MemorySegment segment : segments) {
+                       int remain = segment.size() - offset;
+                       if (remain > 0) {
+                               int nCopy = Math.min(remain, remainSize);
+                               segment.get(offset, bytes, numBytes - 
remainSize + bytesOffset, nCopy);
+                               remainSize -= nCopy;
+                               // next new segment.
+                               offset = 0;
+                               if (remainSize == 0) {
+                                       return;
+                               }
+                       } else {
+                               // remain is negative, let's advance to next 
segment
+                               // now the offset = offset - segmentSize 
(-remain)
+                               offset = -remain;
+                       }
+               }
+       }
+
+       /**
+        * Copy segments to target unsafe pointer.
+        *
+        * @param segments Source segments.
+        * @param offset   The position where the bytes are started to be read 
from these memory
+        *                 segments.
+        * @param target   The unsafe memory to copy the bytes to.
+        * @param pointer  The position in the target unsafe memory to copy the 
chunk to.
+        * @param numBytes the number bytes to copy.
+        */
+       public static void copyToUnsafe(
+               MemorySegment[] segments,
+               int offset,
+               Object target,
+               int pointer,
+               int numBytes) {
+               if (inFirstSegment(segments, offset, numBytes)) {
+                       segments[0].copyToUnsafe(offset, target, pointer, 
numBytes);
+               } else {
+                       copyMultiSegmentsToUnsafe(segments, offset, target, 
pointer, numBytes);
+               }
+       }
+
+       private static void copyMultiSegmentsToUnsafe(
+               MemorySegment[] segments,
+               int offset,
+               Object target,
+               int pointer,
+               int numBytes) {
+               int remainSize = numBytes;
+               for (MemorySegment segment : segments) {
+                       int remain = segment.size() - offset;
+                       if (remain > 0) {
+                               int nCopy = Math.min(remain, remainSize);
+                               segment.copyToUnsafe(offset, target, numBytes - 
remainSize + pointer, nCopy);
+                               remainSize -= nCopy;
+                               // next new segment.
+                               offset = 0;
+                               if (remainSize == 0) {
+                                       return;
+                               }
+                       } else {
+                               // remain is negative, let's advance to next 
segment
+                               // now the offset = offset - segmentSize 
(-remain)
+                               offset = -remain;
+                       }
+               }
+       }
+
+       /**
+        * Copy bytes of segments to output view.
+        *
+        * <p>Note: It just copies the data in, not include the length.
+        *
+        * @param segments    source segments
+        * @param offset      offset for segments
+        * @param sizeInBytes size in bytes
+        * @param target      target output view
+        */
+       public static void copyToView(
+                       MemorySegment[] segments,
+                       int offset,
+                       int sizeInBytes,
+                       DataOutputView target) throws IOException {
+               for (MemorySegment sourceSegment : segments) {
+                       int curSegRemain = sourceSegment.size() - offset;
+                       if (curSegRemain > 0) {
+                               int copySize = Math.min(curSegRemain, 
sizeInBytes);
+
+                               byte[] bytes = allocateReuseBytes(copySize);
+                               sourceSegment.get(offset, bytes, 0, copySize);
+                               target.write(bytes, 0, copySize);
+
+                               sizeInBytes -= copySize;
+                               offset = 0;
+                       } else {
+                               offset -= sourceSegment.size();
+                       }
+
+                       if (sizeInBytes == 0) {
+                               return;
+                       }
+               }
+
+               if (sizeInBytes != 0) {
+                       throw new RuntimeException("No copy finished, this 
should be a bug, " +
+                               "The remaining length is: " + sizeInBytes);
+               }
+       }
+
+       /**
+        * Copy target segments from source byte[].
+        *
+        * @param segments target segments.
+        * @param offset target segments offset.
+        * @param bytes source byte[].
+        * @param bytesOffset source byte[] offset.
+        * @param numBytes the number bytes to copy.
+        */
+       public static void copyFromBytes(
+                       MemorySegment[] segments,
+                       int offset,
+                       byte[] bytes,
+                       int bytesOffset,
+                       int numBytes) {
+               if (segments.length == 1) {
+                       segments[0].put(offset, bytes, bytesOffset, numBytes);
+               } else {
+                       copyMultiSegmentsFromBytes(segments, offset, bytes, 
bytesOffset, numBytes);
+               }
+       }
+
+       private static void copyMultiSegmentsFromBytes(
+                       MemorySegment[] segments,
+                       int offset,
+                       byte[] bytes,
+                       int bytesOffset,
+                       int numBytes) {
+               int remainSize = numBytes;
+               for (MemorySegment segment : segments) {
+                       int remain = segment.size() - offset;
+                       if (remain > 0) {
+                               int nCopy = Math.min(remain, remainSize);
+                               segment.put(offset, bytes, numBytes - 
remainSize + bytesOffset, nCopy);
+                               remainSize -= nCopy;
+                               // next new segment.
+                               offset = 0;
+                               if (remainSize == 0) {
+                                       return;
+                               }
+                       } else {
+                               // remain is negative, let's advance to next 
segment
+                               // now the offset = offset - segmentSize 
(-remain)
+                               offset = -remain;
+                       }
+               }
+       }
+
+       /**
+        * Maybe not copied, if want copy, please use copyTo.
+        */
+       public static byte[] getBytes(MemorySegment[] segments, int baseOffset, 
int sizeInBytes) {
+               // avoid copy if `base` is `byte[]`
+               if (segments.length == 1) {
+                       byte[] heapMemory = segments[0].getHeapMemory();
+                       if (baseOffset == 0
+                               && heapMemory != null
+                               && heapMemory.length == sizeInBytes) {
+                               return heapMemory;
+                       } else {
+                               byte[] bytes = new byte[sizeInBytes];
+                               segments[0].get(baseOffset, bytes, 0, 
sizeInBytes);
+                               return bytes;
+                       }
+               } else {
+                       byte[] bytes = new byte[sizeInBytes];
+                       copyMultiSegmentsToBytes(segments, baseOffset, bytes, 
0, sizeInBytes);
+                       return bytes;
+               }
+       }
+
+       /**
+        * Equals two memory segments regions.
+        *
+        * @param segments1 Segments 1
+        * @param offset1 Offset of segments1 to start equaling
+        * @param segments2 Segments 2
+        * @param offset2 Offset of segments2 to start equaling
+        * @param len Length of the equaled memory region
+        *
+        * @return true if equal, false otherwise
+        */
+       public static boolean equals(
+                       MemorySegment[] segments1, int offset1,
+                       MemorySegment[] segments2, int offset2, int len) {
+               if (inFirstSegment(segments1, offset1, len) && 
inFirstSegment(segments2, offset2, len)) {
+                       return segments1[0].equalTo(segments2[0], offset1, 
offset2, len);
+               } else {
+                       return equalsMultiSegments(segments1, offset1, 
segments2, offset2, len);
+               }
+       }
+
+       @VisibleForTesting
+       static boolean equalsMultiSegments(
+                       MemorySegment[] segments1, int offset1,
+                       MemorySegment[] segments2, int offset2, int len) {
+               if (len == 0) {
+                       // quick way and avoid segSize is zero.
+                       return true;
+               }
+
+               int segSize1 = segments1[0].size();
+               int segSize2 = segments2[0].size();
+
+               // find first segIndex and segOffset of segments.
+               int segIndex1 = offset1 / segSize1;
+               int segIndex2 = offset2 / segSize2;
+               int segOffset1 = offset1 - segSize1 * segIndex1; // equal to %
+               int segOffset2 = offset2 - segSize2 * segIndex2; // equal to %
+
+               while (len > 0) {
+                       int equalLen = Math.min(Math.min(len, segSize1 - 
segOffset1), segSize2 - segOffset2);
+                       if (!segments1[segIndex1].equalTo(segments2[segIndex2], 
segOffset1, segOffset2, equalLen)) {
+                               return false;
+                       }
+                       len -= equalLen;
+                       segOffset1 += equalLen;
+                       if (segOffset1 == segSize1) {
+                               segOffset1 = 0;
+                               segIndex1++;
+                       }
+                       segOffset2 += equalLen;
+                       if (segOffset2 == segSize2) {
+                               segOffset2 = 0;
+                               segIndex2++;
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * hash segments to int, numBytes must be aligned to 4 bytes.
+        *
+        * @param segments Source segments.
+        * @param offset Source segments offset.
+        * @param numBytes the number bytes to hash.
+        */
+       public static int hashByWords(MemorySegment[] segments, int offset, int 
numBytes) {
+               if (inFirstSegment(segments, offset, numBytes)) {
+                       return MurmurHashUtils.hashBytesByWords(segments[0], 
offset, numBytes);
+               } else {
+                       return hashMultiSegByWords(segments, offset, numBytes);
+               }
+       }
+
+       private static int hashMultiSegByWords(MemorySegment[] segments, int 
offset, int numBytes) {
+               byte[] bytes = allocateReuseBytes(numBytes);
+               copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
+               return MurmurHashUtils.hashUnsafeBytesByWords(bytes, 
BYTE_ARRAY_BASE_OFFSET, numBytes);
+       }
+
+       /**
+        * hash segments to int.
+        *
+        * @param segments Source segments.
+        * @param offset Source segments offset.
+        * @param numBytes the number bytes to hash.
+        */
+       public static int hash(MemorySegment[] segments, int offset, int 
numBytes) {
+               if (inFirstSegment(segments, offset, numBytes)) {
+                       return MurmurHashUtils.hashBytes(segments[0], offset, 
numBytes);
+               } else {
+                       return hashMultiSeg(segments, offset, numBytes);
+               }
+       }
+
+       private static int hashMultiSeg(MemorySegment[] segments, int offset, 
int numBytes) {
+               byte[] bytes = allocateReuseBytes(numBytes);
+               copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
+               return MurmurHashUtils.hashUnsafeBytes(bytes, 
BYTE_ARRAY_BASE_OFFSET, numBytes);
+       }
+
+       /**
+        * Is it just in first MemorySegment, we use quick way to do something.
+        */
+       private static boolean inFirstSegment(MemorySegment[] segments, int 
offset, int numBytes) {
+               return numBytes + offset <= segments[0].size();
+       }
+
+       /**
+        * Given a bit index, return the byte index containing it.
+        * @param bitIndex the bit index.
+        * @return the byte index.
+        */
+       private static int byteIndex(int bitIndex) {
+               return bitIndex >>> ADDRESS_BITS_PER_WORD;
+       }
+
+       /**
+        * unset bit.
+        *
+        * @param segment target segment.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitUnSet(MemorySegment segment, int baseOffset, int 
index) {
+               int offset = baseOffset + byteIndex(index);
+               byte current = segment.get(offset);
+               current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(offset, current);
+       }
+
+       /**
+        * set bit.
+        *
+        * @param segment target segment.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitSet(MemorySegment segment, int baseOffset, int 
index) {
+               int offset = baseOffset + byteIndex(index);
+               byte current = segment.get(offset);
+               current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(offset, current);
+       }
+
+       /**
+        * read bit.
+        *
+        * @param segment target segment.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static boolean bitGet(MemorySegment segment, int baseOffset, int 
index) {
+               int offset = baseOffset + byteIndex(index);
+               byte current = segment.get(offset);
+               return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+       }
+
+       /**
+        * unset bit from segments.
+        *
+        * @param segments target segments.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitUnSet(MemorySegment[] segments, int baseOffset, 
int index) {
+               if (segments.length == 1) {
+                       MemorySegment segment = segments[0];
+                       int offset = baseOffset + byteIndex(index);
+                       byte current = segment.get(offset);
+                       current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+                       segment.put(offset, current);
+               } else {
+                       bitUnSetMultiSegments(segments, baseOffset, index);
+               }
+       }
+
+       private static void bitUnSetMultiSegments(MemorySegment[] segments, int 
baseOffset, int index) {
+               int offset = baseOffset + byteIndex(index);
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               MemorySegment segment = segments[segIndex];
+
+               byte current = segment.get(segOffset);
+               current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(segOffset, current);
+       }
+
+       /**
+        * set bit from segments.
+        *
+        * @param segments target segments.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitSet(MemorySegment[] segments, int baseOffset, int 
index) {
+               if (segments.length == 1) {
+                       int offset = baseOffset + byteIndex(index);
+                       MemorySegment segment = segments[0];
+                       byte current = segment.get(offset);
+                       current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+                       segment.put(offset, current);
+               } else {
+                       bitSetMultiSegments(segments, baseOffset, index);
+               }
+       }
+
+       private static void bitSetMultiSegments(MemorySegment[] segments, int 
baseOffset, int index) {
+               int offset = baseOffset + byteIndex(index);
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               MemorySegment segment = segments[segIndex];
+
+               byte current = segment.get(segOffset);
+               current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(segOffset, current);
+       }
+
+       /**
+        * read bit from segments.
+        *
+        * @param segments target segments.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static boolean bitGet(MemorySegment[] segments, int baseOffset, 
int index) {
+               int offset = baseOffset + byteIndex(index);
+               byte current = getByte(segments, offset);
+               return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+       }
+
+       /**
+        * get boolean from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static boolean getBoolean(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       return segments[0].getBoolean(offset);
+               } else {
+                       return getBooleanMultiSegments(segments, offset);
+               }
+       }
+
+       private static boolean getBooleanMultiSegments(MemorySegment[] 
segments, int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               return segments[segIndex].getBoolean(segOffset);
+       }
+
+       /**
+        * set boolean from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setBoolean(MemorySegment[] segments, int offset, 
boolean value) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       segments[0].putBoolean(offset, value);
+               } else {
+                       setBooleanMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setBooleanMultiSegments(MemorySegment[] segments, 
int offset, boolean value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               segments[segIndex].putBoolean(segOffset, value);
+       }
+
+       /**
+        * get byte from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static byte getByte(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       return segments[0].get(offset);
+               } else {
+                       return getByteMultiSegments(segments, offset);
+               }
+       }
+
+       private static byte getByteMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               return segments[segIndex].get(segOffset);
+       }
+
+       /**
+        * set byte from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setByte(MemorySegment[] segments, int offset, byte 
value) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       segments[0].put(offset, value);
+               } else {
+                       setByteMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setByteMultiSegments(MemorySegment[] segments, int 
offset, byte value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               segments[segIndex].put(segOffset, value);
+       }
+
+       /**
+        * get int from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static int getInt(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       return segments[0].getInt(offset);
+               } else {
+                       return getIntMultiSegments(segments, offset);
+               }
+       }
+
+       private static int getIntMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       return segments[segIndex].getInt(segOffset);
+               } else {
+                       return getIntSlowly(segments, segSize, segIndex, 
segOffset);
+               }
+       }
+
+       private static int getIntSlowly(
+               MemorySegment[] segments, int segSize, int segNum, int 
segOffset) {
+               MemorySegment segment = segments[segNum];
+               int ret = 0;
+               for (int i = 0; i < 4; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       int unsignedByte = segment.get(segOffset) & 0xff;
+                       if (LITTLE_ENDIAN) {
+                               ret |= (unsignedByte << (i * 8));
+                       } else {
+                               ret |= (unsignedByte << ((3 - i) * 8));
+                       }
+                       segOffset++;
+               }
+               return ret;
+       }
+
+       /**
+        * set int from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setInt(MemorySegment[] segments, int offset, int 
value) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       segments[0].putInt(offset, value);
+               } else {
+                       setIntMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setIntMultiSegments(MemorySegment[] segments, int 
offset, int value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       segments[segIndex].putInt(segOffset, value);
+               } else {
+                       setIntSlowly(segments, segSize, segIndex, segOffset, 
value);
+               }
+       }
+
+       private static void setIntSlowly(
+               MemorySegment[] segments, int segSize, int segNum, int 
segOffset, int value) {
+               MemorySegment segment = segments[segNum];
+               for (int i = 0; i < 4; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       int unsignedByte;
+                       if (LITTLE_ENDIAN) {
+                               unsignedByte = value >> (i * 8);
+                       } else {
+                               unsignedByte = value >> ((3 - i) * 8);
+                       }
+                       segment.put(segOffset, (byte) unsignedByte);
+                       segOffset++;
+               }
+       }
+
+       /**
+        * get long from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static long getLong(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       return segments[0].getLong(offset);
+               } else {
+                       return getLongMultiSegments(segments, offset);
+               }
+       }
+
+       private static long getLongMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       return segments[segIndex].getLong(segOffset);
+               } else {
+                       return getLongSlowly(segments, segSize, segIndex, 
segOffset);
+               }
+       }
+
+       private static long getLongSlowly(
+               MemorySegment[] segments, int segSize, int segNum, int 
segOffset) {
+               MemorySegment segment = segments[segNum];
+               long ret = 0;
+               for (int i = 0; i < 8; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       long unsignedByte = segment.get(segOffset) & 0xff;
+                       if (LITTLE_ENDIAN) {
+                               ret |= (unsignedByte << (i * 8));
+                       } else {
+                               ret |= (unsignedByte << ((7 - i) * 8));
+                       }
+                       segOffset++;
+               }
+               return ret;
+       }
+
+       /**
+        * set long from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setLong(MemorySegment[] segments, int offset, long 
value) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       segments[0].putLong(offset, value);
+               } else {
+                       setLongMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setLongMultiSegments(MemorySegment[] segments, int 
offset, long value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       segments[segIndex].putLong(segOffset, value);
+               } else {
+                       setLongSlowly(segments, segSize, segIndex, segOffset, 
value);
+               }
+       }
+
+       private static void setLongSlowly(
+               MemorySegment[] segments, int segSize, int segNum, int 
segOffset, long value) {
+               MemorySegment segment = segments[segNum];
+               for (int i = 0; i < 8; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       long unsignedByte;
+                       if (LITTLE_ENDIAN) {
+                               unsignedByte = value >> (i * 8);
+                       } else {
+                               unsignedByte = value >> ((7 - i) * 8);
+                       }
+                       segment.put(segOffset, (byte) unsignedByte);
+                       segOffset++;
+               }
+       }
+
+       /**
+        * get short from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static short getShort(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 2)) {
+                       return segments[0].getShort(offset);
+               } else {
+                       return getShortMultiSegments(segments, offset);
+               }
+       }
+
+       private static short getShortMultiSegments(MemorySegment[] segments, 
int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 1) {
+                       return segments[segIndex].getShort(segOffset);
+               } else {
+                       return (short) getTwoByteSlowly(segments, segSize, 
segIndex, segOffset);
+               }
+       }
+
+       /**
+        * set short from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setShort(MemorySegment[] segments, int offset, short 
value) {
+               if (inFirstSegment(segments, offset, 2)) {
+                       segments[0].putShort(offset, value);
+               } else {
+                       setShortMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setShortMultiSegments(MemorySegment[] segments, int 
offset, short value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 1) {
+                       segments[segIndex].putShort(segOffset, value);
+               } else {
+                       setTwoByteSlowly(segments, segSize, segIndex, 
segOffset, value, value >> 8);
+               }
+       }
+
+       /**
+        * get float from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static float getFloat(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       return segments[0].getFloat(offset);
+               } else {
+                       return getFloatMultiSegments(segments, offset);
+               }
+       }
+
+       private static float getFloatMultiSegments(MemorySegment[] segments, 
int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       return segments[segIndex].getFloat(segOffset);
+               } else {
+                       return Float.intBitsToFloat(getIntSlowly(segments, 
segSize, segIndex, segOffset));
+               }
+       }
+
+       /**
+        * set float from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setFloat(MemorySegment[] segments, int offset, float 
value) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       segments[0].putFloat(offset, value);
+               } else {
+                       setFloatMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setFloatMultiSegments(MemorySegment[] segments, int 
offset, float value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       segments[segIndex].putFloat(segOffset, value);
+               } else {
+                       setIntSlowly(segments, segSize, segIndex, segOffset, 
Float.floatToRawIntBits(value));
+               }
+       }
+
+       /**
+        * get double from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static double getDouble(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       return segments[0].getDouble(offset);
+               } else {
+                       return getDoubleMultiSegments(segments, offset);
+               }
+       }
+
+       private static double getDoubleMultiSegments(MemorySegment[] segments, 
int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       return segments[segIndex].getDouble(segOffset);
+               } else {
+                       return Double.longBitsToDouble(getLongSlowly(segments, 
segSize, segIndex, segOffset));
+               }
+       }
+
+       /**
+        * set double from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setDouble(MemorySegment[] segments, int offset, 
double value) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       segments[0].putDouble(offset, value);
+               } else {
+                       setDoubleMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setDoubleMultiSegments(MemorySegment[] segments, 
int offset, double value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       segments[segIndex].putDouble(segOffset, value);
+               } else {
+                       setLongSlowly(segments, segSize, segIndex, segOffset, 
Double.doubleToRawLongBits(value));
+               }
+       }
+
+       private static int getTwoByteSlowly(
+               MemorySegment[] segments, int segSize, int segNum, int 
segOffset) {
+               MemorySegment segment = segments[segNum];
+               int ret = 0;
+               for (int i = 0; i < 2; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       int unsignedByte = segment.get(segOffset) & 0xff;
+                       if (LITTLE_ENDIAN) {
+                               ret |= (unsignedByte << (i * 8));
+                       } else {
+                               ret |= (unsignedByte << ((1 - i) * 8));
+                       }
+                       segOffset++;
+               }
+               return ret;
+       }
+
+       private static void setTwoByteSlowly(
+               MemorySegment[] segments, int segSize, int segNum, int 
segOffset, int b1, int b2) {
+               MemorySegment segment = segments[segNum];
+               segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b1 : b2));
+               segOffset++;
+               if (segOffset == segSize) {
+                       segment = segments[++segNum];
+                       segOffset = 0;
+               }
+               segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b2 : b1));
+       }
+
+       /**
+        * Gets an instance of {@link DecimalData} from underlying {@link 
MemorySegment}.
+        */
+       public static DecimalData readDecimalData(
+                       MemorySegment[] segments,
+                       int baseOffset,
+                       long offsetAndSize,
+                       int precision,
+                       int scale) {
+               final int size = ((int) offsetAndSize);
+               int subOffset = (int) (offsetAndSize >> 32);
+               byte[] bytes = new byte[size];
+               copyToBytes(segments, baseOffset + subOffset, bytes, 0, size);
+               return DecimalData.fromUnscaledBytes(bytes, precision, scale);
+       }
+
+       /**
+        * Gets an instance of {@link TimestampData} from underlying {@link 
MemorySegment}.
+        *
+        * @param segments the underlying MemorySegments
+        * @param baseOffset the base offset of current instance of {@code 
TimestampData}
+        * @param offsetAndNanos the offset of milli-seconds part and 
nanoseconds
+        * @return an instance of {@link TimestampData}
+        */
+       public static TimestampData readTimestampData(
+                       MemorySegment[] segments, int baseOffset, long 
offsetAndNanos) {
+               final int nanoOfMillisecond = (int) offsetAndNanos;
+               final int subOffset = (int) (offsetAndNanos >> 32);
+               final long millisecond = getLong(segments, baseOffset + 
subOffset);
+               return TimestampData.fromEpochMillis(millisecond, 
nanoOfMillisecond);
+       }
+
+       /**
+        * Get binary, if len less than 8, will be include in 
variablePartOffsetAndLen.
+        *
+        * <p>Note: Need to consider the ByteOrder.
+        *
+        * @param baseOffset base offset of composite binary format.
+        * @param fieldOffset absolute start offset of 
'variablePartOffsetAndLen'.
+        * @param variablePartOffsetAndLen a long value, real data or offset 
and len.
+        */
+       public static byte[] readBinary(
+                       MemorySegment[] segments,
+                       int baseOffset,
+                       int fieldOffset,
+                       long variablePartOffsetAndLen) {
+               long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+               if (mark == 0) {
+                       final int subOffset = (int) (variablePartOffsetAndLen 
>> 32);
+                       final int len = (int) variablePartOffsetAndLen;
+                       return BinarySegmentUtils.copyToBytes(segments, 
baseOffset + subOffset, len);
+               } else {
+                       int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+                       if (BinarySegmentUtils.LITTLE_ENDIAN) {
+                               return BinarySegmentUtils.copyToBytes(segments, 
fieldOffset, len);
+                       } else {
+                               // fieldOffset + 1 to skip header.
+                               return BinarySegmentUtils.copyToBytes(segments, 
fieldOffset + 1, len);
+                       }
+               }
+       }
+
+       /**
+        * Get binary string, if len less than 8, will be include in 
variablePartOffsetAndLen.
+        *
+        * <p>Note: Need to consider the ByteOrder.
+        *
+        * @param baseOffset base offset of composite binary format.
+        * @param fieldOffset absolute start offset of 
'variablePartOffsetAndLen'.
+        * @param variablePartOffsetAndLen a long value, real data or offset 
and len.
+        */
+       public static StringData readStringData(
+                       MemorySegment[] segments,
+                       int baseOffset,
+                       int fieldOffset,
+                       long variablePartOffsetAndLen) {
+               long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+               if (mark == 0) {
+                       final int subOffset = (int) (variablePartOffsetAndLen 
>> 32);
+                       final int len = (int) variablePartOffsetAndLen;
+                       return BinaryStringData.fromAddress(segments, 
baseOffset + subOffset, len);
+               } else {
+                       int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+                       if (BinarySegmentUtils.LITTLE_ENDIAN) {
+                               return BinaryStringData.fromAddress(segments, 
fieldOffset, len);
+                       } else {
+                               // fieldOffset + 1 to skip header.
+                               return BinaryStringData.fromAddress(segments, 
fieldOffset + 1, len);
+                       }
+               }
+       }
+
+       /**
+        * Gets an instance of {@link RawValueData} from underlying {@link 
MemorySegment}.
+        */
+       public static <T> RawValueData<T> readRawValueData(
+                       MemorySegment[] segments, int baseOffset, long 
offsetAndSize) {
+               final int size = ((int) offsetAndSize);
+               int offset = (int) (offsetAndSize >> 32);
+               return new BinaryRawValueData<>(segments, offset + baseOffset, 
size, null);
+       }
+
+       /**
+        * Gets an instance of {@link MapData} from underlying {@link 
MemorySegment}.
+        */
+       public static MapData readMapData(
+                       MemorySegment[] segments, int baseOffset, long 
offsetAndSize) {
+               final int size = ((int) offsetAndSize);
+               int offset = (int) (offsetAndSize >> 32);
+               BinaryMapData map = new BinaryMapData();
+               map.pointTo(segments, offset + baseOffset, size);
+               return map;
+       }
+
+       /**
+        * Gets an instance of {@link ArrayData} from underlying {@link 
MemorySegment}.
+        */
+       public static ArrayData readArrayData(
+                       MemorySegment[] segments, int baseOffset, long 
offsetAndSize) {
+               final int size = ((int) offsetAndSize);
+               int offset = (int) (offsetAndSize >> 32);
+               BinaryArrayData array = new BinaryArrayData();
+               array.pointTo(segments, offset + baseOffset, size);
+               return array;
+       }
+
+       /**
+        * Gets an instance of {@link RowData} from underlying {@link 
MemorySegment}.
+        */
+       public static RowData readRowData(
+                       MemorySegment[] segments, int numFields, int 
baseOffset, long offsetAndSize) {
+               final int size = ((int) offsetAndSize);
+               int offset = (int) (offsetAndSize >> 32);
+               NestedRowData row = new NestedRowData(numFields);
+               row.pointTo(segments, offset + baseOffset, size);
+               return row;
+       }
+
+       /**
+        * Find equal segments2 in segments1.
+        * @param segments1 segs to find.
+        * @param segments2 sub segs.
+        * @return Return the found offset, return -1 if not find.
+        */
+       public static int find(
+               MemorySegment[] segments1, int offset1, int numBytes1,
+               MemorySegment[] segments2, int offset2, int numBytes2) {
+               if (numBytes2 == 0) { // quick way 1.
+                       return offset1;
+               }
+               if (inFirstSegment(segments1, offset1, numBytes1) &&
+                       inFirstSegment(segments2, offset2, numBytes2)) {
+                       byte first = segments2[0].get(offset2);
+                       int end = numBytes1 - numBytes2 + offset1;
+                       for (int i = offset1; i <= end; i++) {
+                               // quick way 2: equal first byte.
+                               if (segments1[0].get(i) == first &&
+                                       segments1[0].equalTo(segments2[0], i, 
offset2, numBytes2)) {
+                                       return i;
+                               }
+                       }
+                       return -1;
+               } else {
+                       return findInMultiSegments(segments1, offset1, 
numBytes1, segments2, offset2, numBytes2);
+               }
+       }
+
+       private static int findInMultiSegments(
+               MemorySegment[] segments1, int offset1, int numBytes1,
+               MemorySegment[] segments2, int offset2, int numBytes2) {
+               int end = numBytes1 - numBytes2 + offset1;
+               for (int i = offset1; i <= end; i++) {
+                       if (equalsMultiSegments(segments1, i, segments2, 
offset2, numBytes2)) {
+                               return i;
+                       }
+               }
+               return -1;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
new file mode 100644
index 0000000..5e85554
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.data.StringData;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A lazily binary implementation of {@link StringData} which is backed by 
{@link MemorySegment}s
+ * and {@link String}.
+ *
+ * <p>Either {@link MemorySegment}s or {@link String} must be provided when
+ * constructing {@link BinaryStringData}. The other representation will be 
materialized when needed.
+ *
+ * <p>It provides many useful methods for comparison, search, and so on.
+ */
+@Internal
+public final class BinaryStringData extends LazyBinaryFormat<String> 
implements StringData {
+
+       public static final BinaryStringData EMPTY_UTF8 = 
BinaryStringData.fromBytes(StringUtf8Utils.encodeUTF8(""));
+
+       public BinaryStringData() {}
+
+       public BinaryStringData(String javaObject) {
+               super(javaObject);
+       }
+
+       public BinaryStringData(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               super(segments, offset, sizeInBytes);
+       }
+
+       public BinaryStringData(MemorySegment[] segments, int offset, int 
sizeInBytes, String javaObject) {
+               super(segments, offset, sizeInBytes, javaObject);
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Construction Utilities
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Creates an BinaryStringData from given address (base and offset) and 
length.
+        */
+       public static BinaryStringData fromAddress(
+                       MemorySegment[] segments,
+                       int offset,
+                       int numBytes) {
+               return new BinaryStringData(segments, offset, numBytes);
+       }
+
+       /**
+        * Creates an BinaryStringData from given java String.
+        */
+       public static BinaryStringData fromString(String str) {
+               if (str == null) {
+                       return null;
+               } else {
+                       return new BinaryStringData(str);
+               }
+       }
+
+       /**
+        * Creates an BinaryStringData from given UTF-8 bytes.
+        */
+       public static BinaryStringData fromBytes(byte[] bytes) {
+               return fromBytes(bytes, 0, bytes.length);
+       }
+
+       /**
+        * Creates an BinaryStringData from given UTF-8 bytes with offset and 
number of bytes.
+        */
+       public static BinaryStringData fromBytes(byte[] bytes, int offset, int 
numBytes) {
+               return new BinaryStringData(
+                       new MemorySegment[] {MemorySegmentFactory.wrap(bytes)},
+                       offset,
+                       numBytes);
+       }
+
+       /**
+        * Creates an BinaryStringData that contains `length` spaces.
+        */
+       public static BinaryStringData blankString(int length) {
+               byte[] spaces = new byte[length];
+               Arrays.fill(spaces, (byte) ' ');
+               return fromBytes(spaces);
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Public Interfaces
+       // 
------------------------------------------------------------------------------------------
+
+       @Override
+       public byte[] toBytes() {
+               ensureMaterialized();
+               return BinarySegmentUtils.getBytes(
+                       binarySection.segments,
+                       binarySection.offset,
+                       binarySection.sizeInBytes);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (o instanceof BinaryStringData) {
+                       BinaryStringData other = (BinaryStringData) o;
+                       if (javaObject != null && other.javaObject != null) {
+                               return javaObject.equals(other.javaObject);
+                       }
+
+                       ensureMaterialized();
+                       other.ensureMaterialized();
+                       return binarySection.equals(other.binarySection);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               ensureMaterialized();
+               return binarySection.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               if (javaObject == null) {
+                       byte[] bytes = 
BinarySegmentUtils.allocateReuseBytes(binarySection.sizeInBytes);
+                       BinarySegmentUtils.copyToBytes(binarySection.segments, 
binarySection.offset, bytes, 0, binarySection.sizeInBytes);
+                       javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, 
binarySection.sizeInBytes);
+               }
+               return javaObject;
+       }
+
+       /**
+        * Compares two strings lexicographically.
+        * Since UTF-8 uses groups of six bits, it is sometimes useful to use 
octal notation which
+        * uses 3-bit groups. With a calculator which can convert between 
hexadecimal and octal it
+        * can be easier to manually create or interpret UTF-8 compared with 
using binary.
+        * So we just compare the binary.
+        */
+       @Override
+       public int compareTo(@Nonnull StringData o) {
+               // BinaryStringData is the only implementation of StringData
+               BinaryStringData other = (BinaryStringData) o;
+               if (javaObject != null && other.javaObject != null) {
+                       return javaObject.compareTo(other.javaObject);
+               }
+
+               ensureMaterialized();
+               other.ensureMaterialized();
+               if (binarySection.segments.length == 1 && 
other.binarySection.segments.length == 1) {
+
+                       int len = Math.min(binarySection.sizeInBytes, 
other.binarySection.sizeInBytes);
+                       MemorySegment seg1 = binarySection.segments[0];
+                       MemorySegment seg2 = other.binarySection.segments[0];
+
+                       for (int i = 0; i < len; i++) {
+                               int res =
+                                       (seg1.get(binarySection.offset + i) & 
0xFF) - (seg2.get(other.binarySection.offset + i) & 0xFF);
+                               if (res != 0) {
+                                       return res;
+                               }
+                       }
+                       return binarySection.sizeInBytes - 
other.binarySection.sizeInBytes;
+               }
+
+               // if there are multi segments.
+               return compareMultiSegments(other);
+       }
+
+       /**
+        * Find the boundaries of segments, and then compare MemorySegment.
+        */
+       private int compareMultiSegments(BinaryStringData other) {
+
+               if (binarySection.sizeInBytes == 0 || 
other.binarySection.sizeInBytes == 0) {
+                       return binarySection.sizeInBytes - 
other.binarySection.sizeInBytes;
+               }
+
+               int len = Math.min(binarySection.sizeInBytes, 
other.binarySection.sizeInBytes);
+
+               MemorySegment seg1 = binarySection.segments[0];
+               MemorySegment seg2 = other.binarySection.segments[0];
+
+               int segmentSize = binarySection.segments[0].size();
+               int otherSegmentSize = other.binarySection.segments[0].size();
+
+               int sizeOfFirst1 = segmentSize - binarySection.offset;
+               int sizeOfFirst2 = otherSegmentSize - 
other.binarySection.offset;
+
+               int varSegIndex1 = 1;
+               int varSegIndex2 = 1;
+
+               // find the first segment of this string.
+               while (sizeOfFirst1 <= 0) {
+                       sizeOfFirst1 += segmentSize;
+                       seg1 = binarySection.segments[varSegIndex1++];
+               }
+
+               while (sizeOfFirst2 <= 0) {
+                       sizeOfFirst2 += otherSegmentSize;
+                       seg2 = other.binarySection.segments[varSegIndex2++];
+               }
+
+               int offset1 = segmentSize - sizeOfFirst1;
+               int offset2 = otherSegmentSize - sizeOfFirst2;
+
+               int needCompare = Math.min(Math.min(sizeOfFirst1, 
sizeOfFirst2), len);
+
+               while (needCompare > 0) {
+                       // compare in one segment.
+                       for (int i = 0; i < needCompare; i++) {
+                               int res = (seg1.get(offset1 + i) & 0xFF) - 
(seg2.get(offset2 + i) & 0xFF);
+                               if (res != 0) {
+                                       return res;
+                               }
+                       }
+                       if (needCompare == len) {
+                               break;
+                       }
+                       len -= needCompare;
+                       // next segment
+                       if (sizeOfFirst1 < sizeOfFirst2) { //I am smaller
+                               seg1 = binarySection.segments[varSegIndex1++];
+                               offset1 = 0;
+                               offset2 += needCompare;
+                               sizeOfFirst1 = segmentSize;
+                               sizeOfFirst2 -= needCompare;
+                       } else if (sizeOfFirst1 > sizeOfFirst2) { //other is 
smaller
+                               seg2 = 
other.binarySection.segments[varSegIndex2++];
+                               offset2 = 0;
+                               offset1 += needCompare;
+                               sizeOfFirst2 = otherSegmentSize;
+                               sizeOfFirst1 -= needCompare;
+                       } else { // same, should go ahead both.
+                               seg1 = binarySection.segments[varSegIndex1++];
+                               seg2 = 
other.binarySection.segments[varSegIndex2++];
+                               offset1 = 0;
+                               offset2 = 0;
+                               sizeOfFirst1 = segmentSize;
+                               sizeOfFirst2 = otherSegmentSize;
+                       }
+                       needCompare = Math.min(Math.min(sizeOfFirst1, 
sizeOfFirst2), len);
+               }
+
+               checkArgument(needCompare == len);
+
+               return binarySection.sizeInBytes - 
other.binarySection.sizeInBytes;
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Public methods on BinaryStringData
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the number of UTF-8 code points in the string.
+        */
+       public int numChars() {
+               ensureMaterialized();
+               if (inFirstSegment()) {
+                       int len = 0;
+                       for (int i = 0; i < binarySection.sizeInBytes; i += 
numBytesForFirstByte(getByteOneSegment(i))) {
+                               len++;
+                       }
+                       return len;
+               } else {
+                       return numCharsMultiSegs();
+               }
+       }
+
+       private int numCharsMultiSegs() {
+               int len = 0;
+               int segSize = binarySection.segments[0].size();
+               BinaryStringData.SegmentAndOffset index = 
firstSegmentAndOffset(segSize);
+               int i = 0;
+               while (i < binarySection.sizeInBytes) {
+                       int charBytes = numBytesForFirstByte(index.value());
+                       i += charBytes;
+                       len++;
+                       index.skipBytes(charBytes, segSize);
+               }
+               return len;
+       }
+
+       /**
+        * Returns the {@code byte} value at the specified index. An index 
ranges from {@code 0} to
+        * {@code binarySection.sizeInBytes - 1}.
+        *
+        * @param      index   the index of the {@code byte} value.
+        * @return     the {@code byte} value at the specified index of this 
UTF-8 bytes.
+        * @exception  IndexOutOfBoundsException  if the {@code index}
+        *             argument is negative or not less than the length of this
+        *             UTF-8 bytes.
+        */
+       public byte byteAt(int index) {
+               ensureMaterialized();
+               int globalOffset = binarySection.offset + index;
+               int size = binarySection.segments[0].size();
+               if (globalOffset < size) {
+                       return binarySection.segments[0].get(globalOffset);
+               } else {
+                       return binarySection.segments[globalOffset / 
size].get(globalOffset % size);
+               }
+       }
+
+       @Override
+       public MemorySegment[] getSegments() {
+               ensureMaterialized();
+               return super.getSegments();
+       }
+
+       @Override
+       public int getOffset() {
+               ensureMaterialized();
+               return super.getOffset();
+       }
+
+       @Override
+       public int getSizeInBytes() {
+               ensureMaterialized();
+               return super.getSizeInBytes();
+       }
+
+       public void ensureMaterialized() {
+               ensureMaterialized(null);
+       }
+
+       @Override
+       protected BinarySection materialize(TypeSerializer<String> serializer) {
+               if (serializer != null) {
+                       throw new IllegalArgumentException("BinaryStringData 
does not support custom serializers");
+               }
+
+               byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject);
+               return new BinarySection(
+                       new MemorySegment[]{MemorySegmentFactory.wrap(bytes)},
+                       0,
+                       bytes.length
+               );
+       }
+
+       /**
+        * Copy a new {@code BinaryStringData}.
+        */
+       public BinaryStringData copy() {
+               ensureMaterialized();
+               byte[] copy = 
BinarySegmentUtils.copyToBytes(binarySection.segments, binarySection.offset, 
binarySection.sizeInBytes);
+               return new BinaryStringData(new MemorySegment[] 
{MemorySegmentFactory.wrap(copy)},
+                       0, binarySection.sizeInBytes, javaObject);
+       }
+
+       /**
+        * Returns a binary string that is a substring of this binary string. 
The substring begins at
+        * the specified {@code beginIndex} and extends to the character at 
index {@code endIndex - 1}.
+        *
+        * <p>Examples:
+        * <blockquote><pre>
+        * fromString("hamburger").substring(4, 8) returns binary string "urge"
+        * fromString("smiles").substring(1, 5) returns binary string "mile"
+        * </pre></blockquote>
+        *
+        * @param beginIndex   the beginning index, inclusive.
+        * @param endIndex     the ending index, exclusive.
+        * @return the specified substring, return EMPTY_UTF8 when index out of 
bounds
+        * instead of StringIndexOutOfBoundsException.
+        */
+       public BinaryStringData substring(int beginIndex, int endIndex) {
+               ensureMaterialized();
+               if (endIndex <= beginIndex || beginIndex >= 
binarySection.sizeInBytes) {
+                       return EMPTY_UTF8;
+               }
+               if (inFirstSegment()) {
+                       MemorySegment segment = binarySection.segments[0];
+                       int i = 0;
+                       int c = 0;
+                       while (i < binarySection.sizeInBytes && c < beginIndex) 
{
+                               i += numBytesForFirstByte(segment.get(i + 
binarySection.offset));
+                               c += 1;
+                       }
+
+                       int j = i;
+                       while (i < binarySection.sizeInBytes && c < endIndex) {
+                               i += numBytesForFirstByte(segment.get(i + 
binarySection.offset));
+                               c += 1;
+                       }
+
+                       if (i > j) {
+                               byte[] bytes = new byte[i - j];
+                               segment.get(binarySection.offset + j, bytes, 0, 
i - j);
+                               return fromBytes(bytes);
+                       } else {
+                               return EMPTY_UTF8;
+                       }
+               } else {
+                       return substringMultiSegs(beginIndex, endIndex);
+               }
+       }
+
+       private BinaryStringData substringMultiSegs(final int start, final int 
until) {
+               int segSize = binarySection.segments[0].size();
+               BinaryStringData.SegmentAndOffset index = 
firstSegmentAndOffset(segSize);
+               int i = 0;
+               int c = 0;
+               while (i < binarySection.sizeInBytes && c < start) {
+                       int charSize = numBytesForFirstByte(index.value());
+                       i += charSize;
+                       index.skipBytes(charSize, segSize);
+                       c += 1;
+               }
+
+               int j = i;
+               while (i < binarySection.sizeInBytes && c < until) {
+                       int charSize = numBytesForFirstByte(index.value());
+                       i += charSize;
+                       index.skipBytes(charSize, segSize);
+                       c += 1;
+               }
+
+               if (i > j) {
+                       return 
fromBytes(BinarySegmentUtils.copyToBytes(binarySection.segments, 
binarySection.offset + j, i - j));
+               } else {
+                       return EMPTY_UTF8;
+               }
+       }
+
+       /**
+        * Returns true if and only if this BinaryStringData contains the 
specified
+        * sequence of bytes values.
+        *
+        * @param s the sequence to search for
+        * @return true if this BinaryStringData contains {@code s}, false 
otherwise
+        */
+       public boolean contains(final BinaryStringData s) {
+               ensureMaterialized();
+               s.ensureMaterialized();
+               if (s.binarySection.sizeInBytes == 0) {
+                       return true;
+               }
+               int find = BinarySegmentUtils.find(
+                       binarySection.segments, binarySection.offset, 
binarySection.sizeInBytes,
+                       s.binarySection.segments, s.binarySection.offset, 
s.binarySection.sizeInBytes);
+               return find != -1;
+       }
+
+       /**
+        * Tests if this BinaryStringData starts with the specified prefix.
+        *
+        * @param   prefix   the prefix.
+        * @return  {@code true} if the bytes represented by the argument is a 
prefix of the bytes
+        *          represented by this string; {@code false} otherwise. Note 
also that {@code true}
+        *          will be returned if the argument is an empty 
BinaryStringData or is equal to this
+        *          {@code BinaryStringData} object as determined by the {@link 
#equals(Object)} method.
+        */
+       public boolean startsWith(final BinaryStringData prefix) {
+               ensureMaterialized();
+               prefix.ensureMaterialized();
+               return matchAt(prefix, 0);
+       }
+
+       /**
+        * Tests if this BinaryStringData ends with the specified suffix.
+        *
+        * @param   suffix   the suffix.
+        * @return  {@code true} if the bytes represented by the argument is a 
suffix of the bytes
+        *          represented by this object; {@code false} otherwise. Note 
that the result will
+        *          be {@code true} if the argument is the empty string or is 
equal to this
+        *          {@code BinaryStringData} object as determined by the {@link 
#equals(Object)} method.
+        */
+       public boolean endsWith(final BinaryStringData suffix) {
+               ensureMaterialized();
+               suffix.ensureMaterialized();
+               return matchAt(suffix, binarySection.sizeInBytes - 
suffix.binarySection.sizeInBytes);
+       }
+
+       /**
+        * Returns a string whose value is this string, with any leading and 
trailing
+        * whitespace removed.
+        *
+        * @return  A string whose value is this string, with any leading and 
trailing white
+        *          space removed, or this string if it has no leading or
+        *          trailing white space.
+        */
+       public BinaryStringData trim() {
+               ensureMaterialized();
+               if (inFirstSegment()) {
+                       int s = 0;
+                       int e = this.binarySection.sizeInBytes - 1;
+                       // skip all of the space (0x20) in the left side
+                       while (s < this.binarySection.sizeInBytes && 
getByteOneSegment(s) == 0x20) {
+                               s++;
+                       }
+                       // skip all of the space (0x20) in the right side
+                       while (e >= s && getByteOneSegment(e) == 0x20) {
+                               e--;
+                       }
+                       if (s > e) {
+                               // empty string
+                               return EMPTY_UTF8;
+                       } else {
+                               return copyBinaryStringInOneSeg(s, e - s + 1);
+                       }
+               } else {
+                       return trimMultiSegs();
+               }
+       }
+
+       private BinaryStringData trimMultiSegs() {
+               int s = 0;
+               int e = this.binarySection.sizeInBytes - 1;
+               int segSize = binarySection.segments[0].size();
+               BinaryStringData.SegmentAndOffset front = 
firstSegmentAndOffset(segSize);
+               // skip all of the space (0x20) in the left side
+               while (s < this.binarySection.sizeInBytes && front.value() == 
0x20) {
+                       s++;
+                       front.nextByte(segSize);
+               }
+               BinaryStringData.SegmentAndOffset behind = 
lastSegmentAndOffset(segSize);
+               // skip all of the space (0x20) in the right side
+               while (e >= s && behind.value() == 0x20) {
+                       e--;
+                       behind.previousByte(segSize);
+               }
+               if (s > e) {
+                       // empty string
+                       return EMPTY_UTF8;
+               } else {
+                       return copyBinaryString(s, e);
+               }
+       }
+
+       /**
+        * Returns the index within this string of the first occurrence of the
+        * specified substring, starting at the specified index.
+        *
+        * @param   str         the substring to search for.
+        * @param   fromIndex   the index from which to start the search.
+        * @return  the index of the first occurrence of the specified 
substring,
+        *          starting at the specified index,
+        *          or {@code -1} if there is no such occurrence.
+        */
+       public int indexOf(BinaryStringData str, int fromIndex) {
+               ensureMaterialized();
+               str.ensureMaterialized();
+               if (str.binarySection.sizeInBytes == 0) {
+                       return 0;
+               }
+               if (inFirstSegment()) {
+                       // position in byte
+                       int byteIdx = 0;
+                       // position is char
+                       int charIdx = 0;
+                       while (byteIdx < binarySection.sizeInBytes && charIdx < 
fromIndex) {
+                               byteIdx += 
numBytesForFirstByte(getByteOneSegment(byteIdx));
+                               charIdx++;
+                       }
+                       do {
+                               if (byteIdx + str.binarySection.sizeInBytes > 
binarySection.sizeInBytes) {
+                                       return -1;
+                               }
+                               if 
(BinarySegmentUtils.equals(binarySection.segments, binarySection.offset + 
byteIdx,
+                                       str.binarySection.segments, 
str.binarySection.offset, str.binarySection.sizeInBytes)) {
+                                       return charIdx;
+                               }
+                               byteIdx += 
numBytesForFirstByte(getByteOneSegment(byteIdx));
+                               charIdx++;
+                       } while (byteIdx < binarySection.sizeInBytes);
+
+                       return -1;
+               } else {
+                       return indexOfMultiSegs(str, fromIndex);
+               }
+       }
+
+       private int indexOfMultiSegs(BinaryStringData str, int fromIndex) {
+               // position in byte
+               int byteIdx = 0;
+               // position is char
+               int charIdx = 0;
+               int segSize = binarySection.segments[0].size();
+               BinaryStringData.SegmentAndOffset index = 
firstSegmentAndOffset(segSize);
+               while (byteIdx < binarySection.sizeInBytes && charIdx < 
fromIndex) {
+                       int charBytes = numBytesForFirstByte(index.value());
+                       byteIdx += charBytes;
+                       charIdx++;
+                       index.skipBytes(charBytes, segSize);
+               }
+               do {
+                       if (byteIdx + str.binarySection.sizeInBytes > 
binarySection.sizeInBytes) {
+                               return -1;
+                       }
+                       if (BinarySegmentUtils.equals(binarySection.segments, 
binarySection.offset + byteIdx,
+                               str.binarySection.segments, 
str.binarySection.offset, str.binarySection.sizeInBytes)) {
+                               return charIdx;
+                       }
+                       int charBytes = 
numBytesForFirstByte(index.segment.get(index.offset));
+                       byteIdx += charBytes;
+                       charIdx++;
+                       index.skipBytes(charBytes, segSize);
+               } while (byteIdx < binarySection.sizeInBytes);
+
+               return -1;
+       }
+
+       /**
+        * Converts all of the characters in this {@code BinaryStringData} to 
upper case.
+        *
+        * @return the {@code BinaryStringData}, converted to uppercase.
+        */
+       public BinaryStringData toUpperCase() {
+               if (javaObject != null) {
+                       return javaToUpperCase();
+               }
+               if (binarySection.sizeInBytes == 0) {
+                       return EMPTY_UTF8;
+               }
+               int size = binarySection.segments[0].size();
+               BinaryStringData.SegmentAndOffset segmentAndOffset = 
startSegmentAndOffset(size);
+               byte[] bytes = new byte[binarySection.sizeInBytes];
+               bytes[0] = (byte) 
Character.toTitleCase(segmentAndOffset.value());
+               for (int i = 0; i < binarySection.sizeInBytes; i++) {
+                       byte b = segmentAndOffset.value();
+                       if (numBytesForFirstByte(b) != 1) {
+                               // fallback
+                               return javaToUpperCase();
+                       }
+                       int upper = Character.toUpperCase((int) b);
+                       if (upper > 127) {
+                               // fallback
+                               return javaToUpperCase();
+                       }
+                       bytes[i] = (byte) upper;
+                       segmentAndOffset.nextByte(size);
+               }
+               return fromBytes(bytes);
+       }
+
+       private BinaryStringData javaToUpperCase() {
+               return fromString(toString().toUpperCase());
+       }
+
+       /**
+        * Converts all of the characters in this {@code BinaryStringData} to 
lower case.
+        *
+        * @return the {@code BinaryStringData}, converted to lowercase.
+        */
+       public BinaryStringData toLowerCase() {
+               if (javaObject != null) {
+                       return javaToLowerCase();
+               }
+               if (binarySection.sizeInBytes == 0) {
+                       return EMPTY_UTF8;
+               }
+               int size = binarySection.segments[0].size();
+               BinaryStringData.SegmentAndOffset segmentAndOffset = 
startSegmentAndOffset(size);
+               byte[] bytes = new byte[binarySection.sizeInBytes];
+               bytes[0] = (byte) 
Character.toTitleCase(segmentAndOffset.value());
+               for (int i = 0; i < binarySection.sizeInBytes; i++) {
+                       byte b = segmentAndOffset.value();
+                       if (numBytesForFirstByte(b) != 1) {
+                               // fallback
+                               return javaToLowerCase();
+                       }
+                       int lower = Character.toLowerCase((int) b);
+                       if (lower > 127) {
+                               // fallback
+                               return javaToLowerCase();
+                       }
+                       bytes[i] = (byte) lower;
+                       segmentAndOffset.nextByte(size);
+               }
+               return fromBytes(bytes);
+       }
+
+       private BinaryStringData javaToLowerCase() {
+               return fromString(toString().toLowerCase());
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       // Internal methods on BinaryStringData
+       // 
------------------------------------------------------------------------------------------
+
+       byte getByteOneSegment(int i) {
+               return binarySection.segments[0].get(binarySection.offset + i);
+       }
+
+       boolean inFirstSegment() {
+               return binarySection.sizeInBytes + binarySection.offset <= 
binarySection.segments[0].size();
+       }
+
+       private boolean matchAt(final BinaryStringData s, int pos) {
+               return (inFirstSegment() && s.inFirstSegment()) ? 
matchAtOneSeg(s, pos) : matchAtVarSeg(s, pos);
+       }
+
+       private boolean matchAtOneSeg(final BinaryStringData s, int pos) {
+               return s.binarySection.sizeInBytes + pos <= 
binarySection.sizeInBytes && pos >= 0 &&
+                       binarySection.segments[0].equalTo(
+                               s.binarySection.segments[0],
+                               binarySection.offset + pos,
+                               s.binarySection.offset,
+                               s.binarySection.sizeInBytes);
+       }
+
+       private boolean matchAtVarSeg(final BinaryStringData s, int pos) {
+               return s.binarySection.sizeInBytes + pos <= 
binarySection.sizeInBytes && pos >= 0 &&
+                       BinarySegmentUtils.equals(
+                               binarySection.segments,
+                               binarySection.offset + pos,
+                               s.binarySection.segments,
+                               s.binarySection.offset,
+                               s.binarySection.sizeInBytes);
+       }
+
+       BinaryStringData copyBinaryStringInOneSeg(int start, int len) {
+               byte[] newBytes = new byte[len];
+               binarySection.segments[0].get(binarySection.offset + start, 
newBytes, 0, len);
+               return fromBytes(newBytes);
+       }
+
+       BinaryStringData copyBinaryString(int start, int end) {
+               int len = end - start + 1;
+               byte[] newBytes = new byte[len];
+               BinarySegmentUtils.copyToBytes(binarySection.segments, 
binarySection.offset + start, newBytes, 0, len);
+               return fromBytes(newBytes);
+       }
+
+       BinaryStringData.SegmentAndOffset firstSegmentAndOffset(int segSize) {
+               int segIndex = binarySection.offset / segSize;
+               return new BinaryStringData.SegmentAndOffset(segIndex, 
binarySection.offset % segSize);
+       }
+
+       BinaryStringData.SegmentAndOffset lastSegmentAndOffset(int segSize) {
+               int lastOffset = binarySection.offset + 
binarySection.sizeInBytes - 1;
+               int segIndex = lastOffset / segSize;
+               return new BinaryStringData.SegmentAndOffset(segIndex, 
lastOffset % segSize);
+       }
+
+       private BinaryStringData.SegmentAndOffset startSegmentAndOffset(int 
segSize) {
+               return inFirstSegment() ? new 
BinaryStringData.SegmentAndOffset(0, binarySection.offset) : 
firstSegmentAndOffset(segSize);
+       }
+
+       /**
+        * CurrentSegment and positionInSegment.
+        */
+       class SegmentAndOffset {
+               int segIndex;
+               MemorySegment segment;
+               int offset;
+
+               private SegmentAndOffset(int segIndex, int offset) {
+                       this.segIndex = segIndex;
+                       this.segment = binarySection.segments[segIndex];
+                       this.offset = offset;
+               }
+
+               private void assignSegment() {
+                       segment = segIndex >= 0 && segIndex < 
binarySection.segments.length ?
+                               binarySection.segments[segIndex] : null;
+               }
+
+               void previousByte(int segSize) {
+                       offset--;
+                       if (offset == -1) {
+                               segIndex--;
+                               assignSegment();
+                               offset = segSize - 1;
+                       }
+               }
+
+               void nextByte(int segSize) {
+                       offset++;
+                       checkAdvance(segSize);
+               }
+
+               private void checkAdvance(int segSize) {
+                       if (offset == segSize) {
+                               advance();
+                       }
+               }
+
+               private void advance() {
+                       segIndex++;
+                       assignSegment();
+                       offset = 0;
+               }
+
+               void skipBytes(int n, int segSize) {
+                       int remaining = segSize - this.offset;
+                       if (remaining > n) {
+                               this.offset += n;
+                       } else {
+                               while (true) {
+                                       int toSkip = Math.min(remaining, n);
+                                       n -= toSkip;
+                                       if (n <= 0) {
+                                               this.offset += toSkip;
+                                               checkAdvance(segSize);
+                                               return;
+                                       }
+                                       advance();
+                                       remaining = segSize - this.offset;
+                               }
+                       }
+               }
+
+               byte value() {
+                       return this.segment.get(this.offset);
+               }
+       }
+
+       /**
+        * Returns the number of bytes for a code point with the first byte as 
`b`.
+        * @param b The first byte of a code point
+        */
+       static int numBytesForFirstByte(final byte b) {
+               if (b >= 0) {
+                       // 1 byte, 7 bits: 0xxxxxxx
+                       return 1;
+               } else if ((b >> 5) == -2 && (b & 0x1e) != 0) {
+                       // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
+                       return 2;
+               } else if ((b >> 4) == -2) {
+                       // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
+                       return 3;
+               } else if ((b >> 3) == -2) {
+                       // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
+                       return 4;
+               } else {
+                       // Skip the first byte disallowed in UTF-8
+                       // Handling errors quietly, same semantics to java 
String.
+                       return 1;
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/LazyBinaryFormat.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/LazyBinaryFormat.java
new file mode 100644
index 0000000..0b65f81
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/LazyBinaryFormat.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.io.IOException;
+
+/**
+ * An abstract implementation fo {@link BinaryFormat} which is lazily 
serialized into binary
+ * or lazily deserialized into Java object.
+ *
+ * <p>The reason why we introduce this data structure is in order to save 
(de)serialization
+ * in nested function calls. Consider the following function call chain:
+ *
+ * <pre>UDF0(input) -> UDF1(result0) -> UDF2(result1) -> UDF3(result2)</pre>
+ *
+ * <p>Such nested calls, if the return values of UDFs are Java object format,
+ * it will result in multiple conversions between Java object and binary 
format:
+ *
+ * <pre>
+ * converterToBinary(UDF0(converterToJavaObject(input))) ->
+ *   converterToBinary(UDF1(converterToJavaObject(result0))) ->
+ *     converterToBinary(UDF2(converterToJavaObject(result1))) ->
+ *       ...
+ * </pre>
+ *
+ * <p>So we introduced {@link LazyBinaryFormat} to avoid the redundant cost, 
it has three forms:
+ * <ul>
+ *     <li>Binary form</li>
+ *     <li>Java object form</li>
+ *     <li>Binary and Java object both exist</li>
+ * </ul>
+ *
+ * <p>It can lazy the conversions as much as possible. It will be converted 
into required form
+ * only when it is needed.
+ */
+@Internal
+public abstract class LazyBinaryFormat<T> implements BinaryFormat {
+
+       T javaObject;
+       BinarySection binarySection;
+
+       public LazyBinaryFormat() {
+               this(null, -1, -1, null);
+       }
+
+       public LazyBinaryFormat(MemorySegment[] segments, int offset, int 
sizeInBytes, T javaObject) {
+               this(javaObject, new BinarySection(segments, offset, 
sizeInBytes));
+       }
+
+       public LazyBinaryFormat(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               this(null, new BinarySection(segments, offset, sizeInBytes));
+       }
+
+       public LazyBinaryFormat(T javaObject) {
+               this(javaObject, null);
+       }
+
+       public LazyBinaryFormat(T javaObject, BinarySection binarySection) {
+               this.javaObject = javaObject;
+               this.binarySection = binarySection;
+       }
+
+       public T getJavaObject() {
+               return javaObject;
+       }
+
+       public BinarySection getBinarySection() {
+               return binarySection;
+       }
+
+       /**
+        * Must be public as it is used during code generation.
+        */
+       public void setJavaObject(T javaObject) {
+               this.javaObject = javaObject;
+       }
+
+       @Override
+       public MemorySegment[] getSegments() {
+               if (binarySection == null) {
+                       throw new IllegalStateException("Lazy Binary Format was 
not materialized");
+               }
+               return binarySection.segments;
+       }
+
+       @Override
+       public int getOffset() {
+               if (binarySection == null) {
+                       throw new IllegalStateException("Lazy Binary Format was 
not materialized");
+               }
+               return binarySection.offset;
+       }
+
+       @Override
+       public int getSizeInBytes() {
+               if (binarySection == null) {
+                       throw new IllegalStateException("Lazy Binary Format was 
not materialized");
+               }
+               return binarySection.sizeInBytes;
+       }
+
+       /**
+        * Ensure we have materialized binary format.
+        */
+       public final void ensureMaterialized(TypeSerializer<T> serializer) {
+               if (binarySection == null) {
+                       try {
+                               this.binarySection = materialize(serializer);
+                       } catch (IOException e) {
+                               throw new WrappingRuntimeException(e);
+                       }
+               }
+       }
+
+       /**
+        * Materialize java object to binary format.
+        * Inherited classes need to hold the information they need.
+        * (For example, {@link RawValueData} needs javaObjectSerializer).
+        */
+       protected abstract BinarySection materialize(TypeSerializer<T> 
serializer) throws IOException;
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/MurmurHashUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/MurmurHashUtils.java
new file mode 100644
index 0000000..6635c66
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/MurmurHashUtils.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+
+/**
+ * Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction.
+ */
+@Internal
+final class MurmurHashUtils {
+
+       private static final int C1 = 0xcc9e2d51;
+       private static final int C2 = 0x1b873593;
+       public static final int DEFAULT_SEED = 42;
+
+       private MurmurHashUtils() {
+               // do not instantiate
+       }
+
+       /**
+        * Hash unsafe bytes, length must be aligned to 4 bytes.
+        * @param base base unsafe object
+        * @param offset offset for unsafe object
+        * @param lengthInBytes length in bytes
+        * @return hash code
+        */
+       public static int hashUnsafeBytesByWords(Object base, long offset, int 
lengthInBytes) {
+               return hashUnsafeBytesByWords(base, offset, lengthInBytes, 
DEFAULT_SEED);
+       }
+
+       /**
+        * Hash unsafe bytes.
+        * @param base base unsafe object
+        * @param offset offset for unsafe object
+        * @param lengthInBytes length in bytes
+        * @return hash code
+        */
+       public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes) {
+               return hashUnsafeBytes(base, offset, lengthInBytes, 
DEFAULT_SEED);
+       }
+
+       /**
+        * Hash bytes in MemorySegment, length must be aligned to 4 bytes.
+        * @param segment segment.
+        * @param offset offset for MemorySegment
+        * @param lengthInBytes length in MemorySegment
+        * @return hash code
+        */
+       public static int hashBytesByWords(MemorySegment segment, int offset, 
int lengthInBytes) {
+               return hashBytesByWords(segment, offset, lengthInBytes, 
DEFAULT_SEED);
+       }
+
+       /**
+        * Hash bytes in MemorySegment.
+        * @param segment segment.
+        * @param offset offset for MemorySegment
+        * @param lengthInBytes length in MemorySegment
+        * @return hash code
+        */
+       public static int hashBytes(MemorySegment segment, int offset, int 
lengthInBytes) {
+               return hashBytes(segment, offset, lengthInBytes, DEFAULT_SEED);
+       }
+
+       private static int hashUnsafeBytesByWords(Object base, long offset, int 
lengthInBytes, int seed) {
+               int h1 = hashUnsafeBytesByInt(base, offset, lengthInBytes, 
seed);
+               return fmix(h1, lengthInBytes);
+       }
+
+       private static int hashBytesByWords(MemorySegment segment, int offset, 
int lengthInBytes, int seed) {
+               int h1 = hashBytesByInt(segment, offset, lengthInBytes, seed);
+               return fmix(h1, lengthInBytes);
+       }
+
+       private static int hashBytes(MemorySegment segment, int offset, int 
lengthInBytes, int seed) {
+               int lengthAligned = lengthInBytes - lengthInBytes % 4;
+               int h1 = hashBytesByInt(segment, offset, lengthAligned, seed);
+               for (int i = lengthAligned; i < lengthInBytes; i++) {
+                       int k1 = mixK1(segment.get(offset + i));
+                       h1 = mixH1(h1, k1);
+               }
+               return fmix(h1, lengthInBytes);
+       }
+
+       private static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes, int seed) {
+               assert (lengthInBytes >= 0) : "lengthInBytes cannot be 
negative";
+               int lengthAligned = lengthInBytes - lengthInBytes % 4;
+               int h1 = hashUnsafeBytesByInt(base, offset, lengthAligned, 
seed);
+               for (int i = lengthAligned; i < lengthInBytes; i++) {
+                       int halfWord = UNSAFE.getByte(base, offset + i);
+                       int k1 = mixK1(halfWord);
+                       h1 = mixH1(h1, k1);
+               }
+               return fmix(h1, lengthInBytes);
+       }
+
+       private static int hashUnsafeBytesByInt(Object base, long offset, int 
lengthInBytes, int seed) {
+               assert (lengthInBytes % 4 == 0);
+               int h1 = seed;
+               for (int i = 0; i < lengthInBytes; i += 4) {
+                       int halfWord = UNSAFE.getInt(base, offset + i);
+                       int k1 = mixK1(halfWord);
+                       h1 = mixH1(h1, k1);
+               }
+               return h1;
+       }
+
+       private static int hashBytesByInt(MemorySegment segment, int offset, 
int lengthInBytes, int seed) {
+               assert (lengthInBytes % 4 == 0);
+               int h1 = seed;
+               for (int i = 0; i < lengthInBytes; i += 4) {
+                       int halfWord = segment.getInt(offset + i);
+                       int k1 = mixK1(halfWord);
+                       h1 = mixH1(h1, k1);
+               }
+               return h1;
+       }
+
+       private static int mixK1(int k1) {
+               k1 *= C1;
+               k1 = Integer.rotateLeft(k1, 15);
+               k1 *= C2;
+               return k1;
+       }
+
+       private static int mixH1(int h1, int k1) {
+               h1 ^= k1;
+               h1 = Integer.rotateLeft(h1, 13);
+               h1 = h1 * 5 + 0xe6546b64;
+               return h1;
+       }
+
+       // Finalization mix - force all bits of a hash block to avalanche
+       private static int fmix(int h1, int length) {
+               h1 ^= length;
+               return fmix(h1);
+       }
+
+       public static int fmix(int h) {
+               h ^= h >>> 16;
+               h *= 0x85ebca6b;
+               h ^= h >>> 13;
+               h *= 0xc2b2ae35;
+               h ^= h >>> 16;
+               return h;
+       }
+
+       public static long fmix(long h) {
+               h ^= (h >>> 33);
+               h *= 0xff51afd7ed558ccdL;
+               h ^= (h >>> 33);
+               h *= 0xc4ceb9fe1a85ec53L;
+               h ^= (h >>> 33);
+               return h;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java
new file mode 100644
index 0000000..66bb953
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import static 
org.apache.flink.table.data.binary.BinaryRowData.calculateBitSetWidthInBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Its memory storage structure is exactly the same with {@link BinaryRowData}.
+ * The only different is that, as {@link NestedRowData} is used
+ * to store row value in the variable-length part of {@link BinaryRowData},
+ * every field (including both fixed-length part and variable-length part) of 
{@link NestedRowData}
+ * has a possibility to cross the boundary of a segment, while the 
fixed-length part of {@link BinaryRowData}
+ * must fit into its first memory segment.
+ */
+@Internal
+public final class NestedRowData extends BinarySection implements RowData, 
TypedSetters {
+
+       private final int arity;
+       private final int nullBitsSizeInBytes;
+
+       public NestedRowData(int arity) {
+               checkArgument(arity >= 0);
+               this.arity = arity;
+               this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+       }
+
+       private int getFieldOffset(int pos) {
+               return offset + nullBitsSizeInBytes + pos * 8;
+       }
+
+       private void assertIndexIsValid(int index) {
+               assert index >= 0 : "index (" + index + ") should >= 0";
+               assert index < arity : "index (" + index + ") should < " + 
arity;
+       }
+
+       @Override
+       public int getArity() {
+               return arity;
+       }
+
+       @Override
+       public RowKind getRowKind() {
+               byte kindValue = BinarySegmentUtils.getByte(segments, offset);
+               return RowKind.fromByteValue(kindValue);
+       }
+
+       @Override
+       public void setRowKind(RowKind kind) {
+               BinarySegmentUtils.setByte(segments, offset, 
kind.toByteValue());
+       }
+
+       private void setNotNullAt(int i) {
+               assertIndexIsValid(i);
+               BinarySegmentUtils.bitUnSet(segments, offset, i + 8);
+       }
+
+       /**
+        * See {@link BinaryRowData#setNullAt(int)}.
+        */
+       @Override
+       public void setNullAt(int i) {
+               assertIndexIsValid(i);
+               BinarySegmentUtils.bitSet(segments, offset, i + 8);
+               BinarySegmentUtils.setLong(segments, getFieldOffset(i), 0);
+       }
+
+       @Override
+       public void setInt(int pos, int value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setInt(segments, getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setLong(int pos, long value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setLong(segments, getFieldOffset(pos), 
value);
+       }
+
+       @Override
+       public void setDouble(int pos, double value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setDouble(segments, getFieldOffset(pos), 
value);
+       }
+
+       @Override
+       public void setDecimal(int pos, DecimalData value, int precision) {
+               assertIndexIsValid(pos);
+
+               if (DecimalData.isCompact(precision)) {
+                       // compact format
+                       setLong(pos, value.toUnscaledLong());
+               } else {
+                       int fieldOffset = getFieldOffset(pos);
+                       int cursor = (int) 
(BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
+                       assert cursor > 0 : "invalid cursor " + cursor;
+                       // zero-out the bytes
+                       BinarySegmentUtils.setLong(segments, offset + cursor, 
0L);
+                       BinarySegmentUtils.setLong(segments, offset + cursor + 
8, 0L);
+
+                       if (value == null) {
+                               setNullAt(pos);
+                               // keep the offset for future update
+                               BinarySegmentUtils.setLong(segments, 
fieldOffset, ((long) cursor) << 32);
+                       } else {
+
+                               byte[] bytes = value.toUnscaledBytes();
+                               assert (bytes.length <= 16);
+
+                               // Write the bytes to the variable length 
portion.
+                               BinarySegmentUtils.copyFromBytes(segments, 
offset + cursor, bytes, 0, bytes.length);
+                               setLong(pos, ((long) cursor << 32) | ((long) 
bytes.length));
+                       }
+               }
+       }
+
+       @Override
+       public void setTimestamp(int pos, TimestampData value, int precision) {
+               assertIndexIsValid(pos);
+
+               if (TimestampData.isCompact(precision)) {
+                       setLong(pos, value.getMillisecond());
+               } else {
+                       int fieldOffset = getFieldOffset(pos);
+                       int cursor = (int) 
(BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
+                       assert cursor > 0 : "invalid cursor " + cursor;
+
+                       if (value == null) {
+                               setNullAt(pos);
+                               // zero-out the bytes
+                               BinarySegmentUtils.setLong(segments, offset + 
cursor, 0L);
+                               BinarySegmentUtils.setLong(segments, 
fieldOffset, ((long) cursor) << 32);
+                       } else {
+                               // write millisecond to variable length portion.
+                               BinarySegmentUtils.setLong(segments, offset + 
cursor, value.getMillisecond());
+                               // write nanoOfMillisecond to fixed-length 
portion.
+                               setLong(pos, ((long) cursor << 32) | (long) 
value.getNanoOfMillisecond());
+                       }
+               }
+       }
+
+       @Override
+       public void setBoolean(int pos, boolean value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setBoolean(segments, getFieldOffset(pos), 
value);
+       }
+
+       @Override
+       public void setShort(int pos, short value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setShort(segments, getFieldOffset(pos), 
value);
+       }
+
+       @Override
+       public void setByte(int pos, byte value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setByte(segments, getFieldOffset(pos), 
value);
+       }
+
+       @Override
+       public void setFloat(int pos, float value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               BinarySegmentUtils.setFloat(segments, getFieldOffset(pos), 
value);
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.bitGet(segments, offset, pos + 8);
+       }
+
+       @Override
+       public boolean getBoolean(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getBoolean(segments, 
getFieldOffset(pos));
+       }
+
+       @Override
+       public byte getByte(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getByte(segments, 
getFieldOffset(pos));
+       }
+
+       @Override
+       public short getShort(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getShort(segments, 
getFieldOffset(pos));
+       }
+
+       @Override
+       public int getInt(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getInt(segments, getFieldOffset(pos));
+       }
+
+       @Override
+       public long getLong(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getLong(segments, 
getFieldOffset(pos));
+       }
+
+       @Override
+       public float getFloat(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getFloat(segments, 
getFieldOffset(pos));
+       }
+
+       @Override
+       public double getDouble(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.getDouble(segments, 
getFieldOffset(pos));
+       }
+
+       @Override
+       public StringData getString(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readStringData(segments, offset, 
fieldOffset, offsetAndLen);
+       }
+
+       @Override
+       public DecimalData getDecimal(int pos, int precision, int scale) {
+               assertIndexIsValid(pos);
+
+               if (DecimalData.isCompact(precision)) {
+                       return DecimalData.fromUnscaledLong(
+                               BinarySegmentUtils.getLong(segments, 
getFieldOffset(pos)),
+                               precision,
+                               scale);
+               }
+
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndSize = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readDecimalData(segments, offset, 
offsetAndSize, precision, scale);
+       }
+
+       @Override
+       public TimestampData getTimestamp(int pos, int precision) {
+               assertIndexIsValid(pos);
+
+               if (TimestampData.isCompact(precision)) {
+                       return 
TimestampData.fromEpochMillis(BinarySegmentUtils.getLong(segments, 
getFieldOffset(pos)));
+               }
+
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndNanoOfMilli = 
BinarySegmentUtils.getLong(segments, fieldOffset);
+               return BinarySegmentUtils.readTimestampData(segments, offset, 
offsetAndNanoOfMilli);
+       }
+
+       @Override
+       public <T> RawValueData<T> getRawValue(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readRawValueData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public byte[] getBinary(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = BinarySegmentUtils.getLong(segments, 
fieldOffset);
+               return BinarySegmentUtils.readBinary(segments, offset, 
fieldOffset, offsetAndLen);
+       }
+
+       @Override
+       public RowData getRow(int pos, int numFields) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readRowData(segments, numFields, 
offset, getLong(pos));
+       }
+
+       @Override
+       public ArrayData getArray(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readArrayData(segments, offset, 
getLong(pos));
+       }
+
+       @Override
+       public MapData getMap(int pos) {
+               assertIndexIsValid(pos);
+               return BinarySegmentUtils.readMapData(segments, offset, 
getLong(pos));
+       }
+
+       public NestedRowData copy() {
+               return copy(new NestedRowData(arity));
+       }
+
+       public NestedRowData copy(RowData reuse) {
+               return copyInternal((NestedRowData) reuse);
+       }
+
+       private NestedRowData copyInternal(NestedRowData reuse) {
+               byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+
+       @Override
+       public int hashCode() {
+               return BinarySegmentUtils.hashByWords(segments, offset, 
sizeInBytes);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
new file mode 100644
index 0000000..8e6094b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReuseBytes;
+import static 
org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReuseChars;
+
+/**
+ * Utilities for String UTF-8.
+ */
+@Internal
+final class StringUtf8Utils {
+
+       private static final int MAX_BYTES_PER_CHAR = 3;
+
+       private StringUtf8Utils() {
+               // do not instantiate
+       }
+
+       /**
+        * This method must have the same result with JDK's String.getBytes.
+        */
+       public static byte[] encodeUTF8(String str) {
+               byte[] bytes = allocateReuseBytes(str.length() * 
MAX_BYTES_PER_CHAR);
+               int len = encodeUTF8(str, bytes);
+               return Arrays.copyOf(bytes, len);
+       }
+
+       public static int encodeUTF8(String str, byte[] bytes) {
+               int offset = 0;
+               int len = str.length();
+               int sl = offset + len;
+               int dp = 0;
+               int dlASCII = dp + Math.min(len, bytes.length);
+
+               // ASCII only optimized loop
+               while (dp < dlASCII && str.charAt(offset) < '\u0080') {
+                       bytes[dp++] = (byte) str.charAt(offset++);
+               }
+
+               while (offset < sl) {
+                       char c = str.charAt(offset++);
+                       if (c < 0x80) {
+                               // Have at most seven bits
+                               bytes[dp++] = (byte) c;
+                       } else if (c < 0x800) {
+                               // 2 bytes, 11 bits
+                               bytes[dp++] = (byte) (0xc0 | (c >> 6));
+                               bytes[dp++] = (byte) (0x80 | (c & 0x3f));
+                       } else if (Character.isSurrogate(c)) {
+                               final int uc;
+                               int ip = offset - 1;
+                               if (Character.isHighSurrogate(c)) {
+                                       if (sl - ip < 2) {
+                                               uc = -1;
+                                       } else {
+                                               char d = str.charAt(ip + 1);
+                                               if 
(Character.isLowSurrogate(d)) {
+                                                       uc = 
Character.toCodePoint(c, d);
+                                               } else {
+                                                       // for some illegal 
character
+                                                       // the jdk will ignore 
the origin character and cast it to '?'
+                                                       // this acts the same 
with jdk
+                                                       return 
defaultEncodeUTF8(str, bytes);
+                                               }
+                                       }
+                               } else {
+                                       if (Character.isLowSurrogate(c)) {
+                                               // for some illegal character
+                                               // the jdk will ignore the 
origin character and cast it to '?'
+                                               // this acts the same with jdk
+                                               return defaultEncodeUTF8(str, 
bytes);
+                                       } else {
+                                               uc = c;
+                                       }
+                               }
+
+                               if (uc < 0) {
+                                       bytes[dp++] = (byte) '?';
+                               } else {
+                                       bytes[dp++] = (byte) (0xf0 | ((uc >> 
18)));
+                                       bytes[dp++] = (byte) (0x80 | ((uc >> 
12) & 0x3f));
+                                       bytes[dp++] = (byte) (0x80 | ((uc >> 6) 
& 0x3f));
+                                       bytes[dp++] = (byte) (0x80 | (uc & 
0x3f));
+                                       offset++; // 2 chars
+                               }
+                       } else {
+                               // 3 bytes, 16 bits
+                               bytes[dp++] = (byte) (0xe0 | ((c >> 12)));
+                               bytes[dp++] = (byte) (0x80 | ((c >> 6) & 0x3f));
+                               bytes[dp++] = (byte) (0x80 | (c & 0x3f));
+                       }
+               }
+               return dp;
+       }
+
+       public static int defaultEncodeUTF8(String str, byte[] bytes) {
+               try {
+                       byte[] buffer = str.getBytes("UTF-8");
+                       System.arraycopy(buffer, 0, bytes, 0, buffer.length);
+                       return buffer.length;
+               } catch (UnsupportedEncodingException e) {
+                       throw new RuntimeException("encodeUTF8 error", e);
+               }
+       }
+
+       public static String decodeUTF8(byte[] input, int offset, int byteLen) {
+               char[] chars = allocateReuseChars(byteLen);
+               int len = decodeUTF8Strict(input, offset, byteLen, chars);
+               if (len < 0) {
+                       return defaultDecodeUTF8(input, offset, byteLen);
+               }
+               return new String(chars, 0, len);
+       }
+
+       public static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] 
da) {
+               final int sl = sp + len;
+               int dp = 0;
+               int dlASCII = Math.min(len, da.length);
+
+               // ASCII only optimized loop
+               while (dp < dlASCII && sa[sp] >= 0) {
+                       da[dp++] = (char) sa[sp++];
+               }
+
+               while (sp < sl) {
+                       int b1 = sa[sp++];
+                       if (b1 >= 0) {
+                               // 1 byte, 7 bits: 0xxxxxxx
+                               da[dp++] = (char) b1;
+                       } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
+                               // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
+                               if (sp < sl) {
+                                       int b2 = sa[sp++];
+                                       if ((b2 & 0xc0) != 0x80) { // 
isNotContinuation(b2)
+                                               return -1;
+                                       } else {
+                                               da[dp++] = (char) (((b1 << 6) ^ 
b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
+                                       }
+                                       continue;
+                               }
+                               return -1;
+                       } else if ((b1 >> 4) == -2) {
+                               // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
+                               if (sp + 1 < sl) {
+                                       int b2 = sa[sp++];
+                                       int b3 = sa[sp++];
+                                       if ((b1 == (byte) 0xe0 && (b2 & 0xe0) 
== 0x80)
+                                               || (b2 & 0xc0) != 0x80
+                                               || (b3 & 0xc0) != 0x80) { // 
isMalformed3(b1, b2, b3)
+                                               return -1;
+                                       } else {
+                                               char c = (char) ((b1 << 12) ^ 
(b2 << 6) ^ (b3 ^
+                                                       (((byte) 0xE0 << 12) ^ 
((byte) 0x80 << 6) ^ ((byte) 0x80))));
+                                               if (Character.isSurrogate(c)) {
+                                                       return -1;
+                                               } else {
+                                                       da[dp++] = c;
+                                               }
+                                       }
+                                       continue;
+                               }
+                               return -1;
+                       } else if ((b1 >> 3) == -2) {
+                               // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 
10xxxxxx
+                               if (sp + 2 < sl) {
+                                       int b2 = sa[sp++];
+                                       int b3 = sa[sp++];
+                                       int b4 = sa[sp++];
+                                       int uc = ((b1 << 18) ^
+                                               (b2 << 12) ^
+                                               (b3 << 6) ^
+                                               (b4 ^ (((byte) 0xF0 << 18) ^ 
((byte) 0x80 << 12) ^
+                                                       ((byte) 0x80 << 6) ^ 
((byte) 0x80))));
+                                       // isMalformed4 and shortest form check
+                                       if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) 
!= 0x80 || (b4 & 0xc0) != 0x80)
+                                               || 
!Character.isSupplementaryCodePoint(uc)) {
+                                               return -1;
+                                       } else {
+                                               da[dp++] = 
Character.highSurrogate(uc);
+                                               da[dp++] = 
Character.lowSurrogate(uc);
+                                       }
+                                       continue;
+                               }
+                               return -1;
+                       } else {
+                               return -1;
+                       }
+               }
+               return dp;
+       }
+
+       public static String decodeUTF8(MemorySegment input, int offset, int 
byteLen) {
+               char[] chars = allocateReuseChars(byteLen);
+               int len = decodeUTF8Strict(input, offset, byteLen, chars);
+               if (len < 0) {
+                       byte[] bytes = allocateReuseBytes(byteLen);
+                       input.get(offset, bytes, 0, byteLen);
+                       return defaultDecodeUTF8(bytes, 0, byteLen);
+               }
+               return new String(chars, 0, len);
+       }
+
+       public static int decodeUTF8Strict(MemorySegment segment, int sp, int 
len, char[] da) {
+               final int sl = sp + len;
+               int dp = 0;
+               int dlASCII = Math.min(len, da.length);
+
+               // ASCII only optimized loop
+               while (dp < dlASCII && segment.get(sp) >= 0) {
+                       da[dp++] = (char) segment.get(sp++);
+               }
+
+               while (sp < sl) {
+                       int b1 = segment.get(sp++);
+                       if (b1 >= 0) {
+                               // 1 byte, 7 bits: 0xxxxxxx
+                               da[dp++] = (char) b1;
+                       } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
+                               // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
+                               if (sp < sl) {
+                                       int b2 = segment.get(sp++);
+                                       if ((b2 & 0xc0) != 0x80) { // 
isNotContinuation(b2)
+                                               return -1;
+                                       } else {
+                                               da[dp++] = (char) (((b1 << 6) ^ 
b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
+                                       }
+                                       continue;
+                               }
+                               return -1;
+                       } else if ((b1 >> 4) == -2) {
+                               // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
+                               if (sp + 1 < sl) {
+                                       int b2 = segment.get(sp++);
+                                       int b3 = segment.get(sp++);
+                                       if ((b1 == (byte) 0xe0 && (b2 & 0xe0) 
== 0x80)
+                                               || (b2 & 0xc0) != 0x80
+                                               || (b3 & 0xc0) != 0x80) { // 
isMalformed3(b1, b2, b3)
+                                               return -1;
+                                       } else {
+                                               char c = (char) ((b1 << 12) ^ 
(b2 << 6) ^ (b3 ^
+                                                       (((byte) 0xE0 << 12) ^ 
((byte) 0x80 << 6) ^ ((byte) 0x80))));
+                                               if (Character.isSurrogate(c)) {
+                                                       return -1;
+                                               } else {
+                                                       da[dp++] = c;
+                                               }
+                                       }
+                                       continue;
+                               }
+                               return -1;
+                       } else if ((b1 >> 3) == -2) {
+                               // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 
10xxxxxx
+                               if (sp + 2 < sl) {
+                                       int b2 = segment.get(sp++);
+                                       int b3 = segment.get(sp++);
+                                       int b4 = segment.get(sp++);
+                                       int uc = ((b1 << 18) ^
+                                               (b2 << 12) ^
+                                               (b3 << 6) ^
+                                               (b4 ^ (((byte) 0xF0 << 18) ^ 
((byte) 0x80 << 12) ^
+                                                       ((byte) 0x80 << 6) ^ 
((byte) 0x80))));
+                                       // isMalformed4 and shortest form check
+                                       if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) 
!= 0x80 || (b4 & 0xc0) != 0x80)
+                                               || 
!Character.isSupplementaryCodePoint(uc)) {
+                                               return -1;
+                                       } else {
+                                               da[dp++] = 
Character.highSurrogate(uc);
+                                               da[dp++] = 
Character.lowSurrogate(uc);
+                                       }
+                                       continue;
+                               }
+                               return -1;
+                       } else {
+                               return -1;
+                       }
+               }
+               return dp;
+       }
+
+       public static String defaultDecodeUTF8(byte[] bytes, int offset, int 
len) {
+               return new String(bytes, offset, len, StandardCharsets.UTF_8);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/TypedSetters.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/TypedSetters.java
new file mode 100644
index 0000000..3555170
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/TypedSetters.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+
+/**
+ * Provide type specialized setters to reduce if/else and eliminate box and 
unbox. This is mainly
+ * used on the binary format such as {@link BinaryRowData}.
+ */
+@Internal
+public interface TypedSetters {
+
+       void setNullAt(int pos);
+
+       void setBoolean(int pos, boolean value);
+
+       void setByte(int pos, byte value);
+
+       void setShort(int pos, short value);
+
+       void setInt(int pos, int value);
+
+       void setLong(int pos, long value);
+
+       void setFloat(int pos, float value);
+
+       void setDouble(int pos, double value);
+
+       /**
+        * Set the decimal column value.
+        *
+        * <p>Note:
+        * Precision is compact: can call {@link #setNullAt} when decimal is 
null.
+        * Precision is not compact: can not call {@link #setNullAt} when 
decimal is null, must call
+        * {@code setDecimal(pos, null, precision)} because we need update 
var-length-part.
+        */
+       void setDecimal(int pos, DecimalData value, int precision);
+
+       /**
+        * Set Timestamp value.
+        *
+        * <p>Note:
+        * If precision is compact: can call {@link #setNullAt} when 
TimestampData value is null.
+        * Otherwise: can not call {@link #setNullAt} when TimestampData value 
is null, must call
+        * {@code setTimestamp(pos, null, precision)} because we need to update 
var-length-part.
+        */
+       void setTimestamp(int pos, TimestampData value, int precision);
+}

Reply via email to