This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 76f979b  [FLINK-11701][table-runtime-blink] Introduce an abstract set 
of data formats
76f979b is described below

commit 76f979b7d35125b161e5cd71a948b969aa712ef8
Author: JingsongLi <[email protected]>
AuthorDate: Fri Mar 1 11:14:38 2019 +0800

    [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
    
    This closes #7816
---
 .../table/dataformat/AbstractBinaryWriter.java     | 178 +++++
 .../org/apache/flink/table/dataformat/BaseRow.java |  48 ++
 .../apache/flink/table/dataformat/BinaryArray.java | 410 +++++++++++
 .../flink/table/dataformat/BinaryArrayWriter.java  | 181 +++++
 .../flink/table/dataformat/BinaryFormat.java       |  64 ++
 .../apache/flink/table/dataformat/BinaryMap.java   |  96 +++
 .../apache/flink/table/dataformat/BinaryRow.java   | 299 ++++++++
 .../flink/table/dataformat/BinaryRowWriter.java    | 131 ++++
 .../flink/table/dataformat/BinaryString.java       |  93 +++
 .../flink/table/dataformat/BinaryWriter.java       |  64 ++
 .../apache/flink/table/dataformat/GenericRow.java  | 130 ++++
 .../flink/table/dataformat/ObjectArrayRow.java     | 104 +++
 .../flink/table/dataformat/TypeGetterSetters.java  | 138 ++++
 .../apache/flink/table/util/BinaryStringUtil.java  |  57 ++
 .../org/apache/flink/table/util/SegmentsUtil.java  | 811 +++++++++++++++++++++
 .../flink/table/dataformat/BinaryArrayTest.java    | 429 +++++++++++
 .../flink/table/dataformat/BinaryRowTest.java      | 245 +++++++
 .../apache/flink/table/util/SegmentsUtilTest.java  |  62 ++
 18 files changed, 3540 insertions(+)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
new file mode 100644
index 0000000..3deca69
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.BinaryStringUtil;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.util.Arrays;
+
+/**
+ * Use the special format to write data to a {@link MemorySegment} (its 
capacity grows
+ * automatically).
+ *
+ * <p>If write a format binary:
+ * 1. New a writer.
+ * 2. Write each field by writeXX or setNullAt. (Variable length fields can 
not be written
+ * repeatedly.)
+ * 3. Invoke {@link #complete()}.
+ *
+ * <p>If want to reuse this writer, please invoke {@link #reset()} first.
+ */
+public abstract class AbstractBinaryWriter implements BinaryWriter {
+
+       protected MemorySegment segment;
+
+       protected int cursor;
+
+       /**
+        * Set offset and size to fix len part.
+        */
+       protected abstract void setOffsetAndSize(int pos, int offset, long 
size);
+
+       /**
+        *  Get field offset.
+        */
+       protected abstract int getFieldOffset(int pos);
+
+       /**
+        * After grow, need point to new memory.
+        */
+       protected abstract void afterGrow();
+
+       /**
+        * See {@link BinaryString#readBinaryStringFieldFromSegments}.
+        */
+       @Override
+       public void writeString(int pos, BinaryString input) {
+               int len = input.getSizeInBytes();
+               if (len <= 7) {
+                       byte[] bytes = BinaryStringUtil.allocateReuseBytes(len);
+                       SegmentsUtil.copyToBytes(input.getSegments(), 
input.getOffset(), bytes, 0, len);
+                       writeBytesToFixLenPart(segment, getFieldOffset(pos), 
bytes, len);
+               } else {
+                       writeSegmentsToVarLenPart(pos, input.getSegments(), 
input.getOffset(), len);
+               }
+       }
+
+       @Override
+       public void writeArray(int pos, BinaryArray input) {
+               writeSegmentsToVarLenPart(pos, input.getSegments(), 
input.getOffset(), input.getSizeInBytes());
+       }
+
+       @Override
+       public void writeMap(int pos, BinaryMap input) {
+               writeSegmentsToVarLenPart(pos, input.getSegments(), 
input.getOffset(), input.getSizeInBytes());
+       }
+
+       private void zeroOutPaddingBytes(int numBytes) {
+               if ((numBytes & 0x07) > 0) {
+                       segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
+               }
+       }
+
+       private void ensureCapacity(int neededSize) {
+               final int length = cursor + neededSize;
+               if (segment.size() < length) {
+                       grow(length);
+               }
+       }
+
+       private void writeSegmentsToVarLenPart(int pos, MemorySegment[] 
segments, int offset, int size) {
+               final int roundedSize = roundNumberOfBytesToNearestWord(size);
+
+               // grow the global buffer before writing data.
+               ensureCapacity(roundedSize);
+
+               zeroOutPaddingBytes(size);
+
+               if (segments.length == 1) {
+                       segments[0].copyTo(offset, segment, cursor, size);
+               } else {
+                       writeMultiSegmentsToVarLenPart(segments, offset, size);
+               }
+
+               setOffsetAndSize(pos, cursor, size);
+
+               // move the cursor forward.
+               cursor += roundedSize;
+       }
+
+       private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, 
int offset, int size) {
+               // Write the bytes to the variable length portion.
+               int needCopy = size;
+               int fromOffset = offset;
+               int toOffset = cursor;
+               for (MemorySegment sourceSegment : segments) {
+                       int remain = sourceSegment.size() - fromOffset;
+                       if (remain > 0) {
+                               int copySize = remain > needCopy ? needCopy : 
remain;
+                               sourceSegment.copyTo(fromOffset, segment, 
toOffset, copySize);
+                               needCopy -= copySize;
+                               toOffset += copySize;
+                               fromOffset = 0;
+                       } else {
+                               fromOffset -= sourceSegment.size();
+                       }
+               }
+       }
+
+       /**
+        * Increases the capacity to ensure that it can hold at least the
+        * minimum capacity argument.
+        */
+       private void grow(int minCapacity) {
+               int oldCapacity = segment.size();
+               int newCapacity = oldCapacity + (oldCapacity >> 1);
+               if (newCapacity - minCapacity < 0) {
+                       newCapacity = minCapacity;
+               }
+               segment = 
MemorySegmentFactory.wrap(Arrays.copyOf(segment.getArray(), newCapacity));
+               afterGrow();
+       }
+
+       protected static int roundNumberOfBytesToNearestWord(int numBytes) {
+               int remainder = numBytes & 0x07;
+               if (remainder == 0) {
+                       return numBytes;
+               } else {
+                       return numBytes + (8 - remainder);
+               }
+       }
+
+       private static void writeBytesToFixLenPart(
+                       MemorySegment segment, int fieldOffset, byte[] bytes, 
int len) {
+               long firstByte = len | 0x80; // first bit is 1, other bits is 
len
+               long sevenBytes = 0L; // real data
+               if (BinaryRow.LITTLE_ENDIAN) {
+                       for (int i = 0; i < len; i++) {
+                               sevenBytes |= ((0x00000000000000FFL & bytes[i]) 
<< (i * 8L));
+                       }
+               } else {
+                       for (int i = 0; i < len; i++) {
+                               sevenBytes |= ((0x00000000000000FFL & bytes[i]) 
<< ((6 - i) * 8L));
+                       }
+               }
+
+               final long offsetAndSize = (firstByte << 56) | sevenBytes;
+
+               segment.putLong(fieldOffset, offsetAndSize);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseRow.java
new file mode 100644
index 0000000..8ee4836
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * An interface for row used internally in Flink Table/SQL.
+ *
+ * <p>There are different implementations depending on the scenario:
+ * After serialization, it becomes the BinaryRow format.
+ * Convenient updates use the GenericRow format.
+ *
+ * <p>{@code BaseRow}s are influenced by Apache Spark InternalRows.
+ */
+public interface BaseRow extends TypeGetterSetters {
+
+       /**
+        * Get the number of fields in the BaseRow.
+        *
+        * @return The number of fields in the BaseRow.
+        */
+       int getArity();
+
+       /**
+        * The header represents the type of this Row. Now just used in 
streaming.
+        * Now there are two message: ACCUMULATE_MSG and RETRACT_MSG.
+        */
+       byte getHeader();
+
+       /**
+        * Set the byte header.
+        */
+       void setHeader(byte header);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
new file mode 100644
index 0000000..e708710
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+
+/**
+ * For fields that hold fixed-length primitive types, such as long, double, or 
int, we store the
+ * value directly in the field, just like the original java array.
+ *
+ * <p>[numElements(int)] + [null bits(4-byte word boundaries)] + [values or 
offset&length] + [variable length part].
+ *
+ * <p>{@code BinaryArray} are influenced by Apache Spark UnsafeArrayData.
+ */
+public class BinaryArray extends BinaryFormat implements TypeGetterSetters {
+
+       /**
+        * Offset for Arrays.
+        */
+       private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+       private static final int BOOLEAN_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(boolean[].class);
+       private static final int SHORT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(short[].class);
+       private static final int INT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(int[].class);
+       private static final int LONG_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(long[].class);
+       private static final int FLOAT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(float[].class);
+       private static final int DOUBLE_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(double[].class);
+
+       public static int calculateHeaderInBytes(int numFields) {
+               return 4 + ((numFields + 31) / 32) * 4;
+       }
+
+       // The number of elements in this array
+       private int numElements;
+
+       /** The position to start storing array elements. */
+       private int elementOffset;
+
+       public BinaryArray() {}
+
+       private void assertIndexIsValid(int ordinal) {
+               assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
+               assert ordinal < numElements : "ordinal (" + ordinal + ") 
should < " + numElements;
+       }
+
+       private int getElementOffset(int ordinal, int elementSize) {
+               return elementOffset + ordinal * elementSize;
+       }
+
+       public int numElements() {
+               return numElements;
+       }
+
+       public void pointTo(MemorySegment segment, int offset, int sizeInBytes) 
{
+               pointTo(new MemorySegment[]{segment}, offset, sizeInBytes);
+       }
+
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               // Read the number of elements from the first 4 bytes.
+               final int numElements = SegmentsUtil.getInt(segments, offset);
+               assert numElements >= 0 : "numElements (" + numElements + ") 
should >= 0";
+
+               this.numElements = numElements;
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+               this.elementOffset = offset + 
calculateHeaderInBytes(this.numElements);
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.bitGet(segments, offset + 4, pos);
+       }
+
+       @Override
+       public void setNullAt(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+       }
+
+       public void setNotNullAt(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitUnSet(segments, offset + 4, pos);
+       }
+
+       @Override
+       public long getLong(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getLong(segments, getElementOffset(pos, 8));
+       }
+
+       @Override
+       public void setLong(int pos, long value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setLong(segments, getElementOffset(pos, 8), value);
+       }
+
+       public void setNullLong(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setLong(segments, getElementOffset(pos, 8), 0L);
+       }
+
+       @Override
+       public int getInt(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getInt(segments, getElementOffset(pos, 4));
+       }
+
+       @Override
+       public void setInt(int pos, int value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setInt(segments, getElementOffset(pos, 4), value);
+       }
+
+       public void setNullInt(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setInt(segments, getElementOffset(pos, 4), 0);
+       }
+
+       @Override
+       public BinaryString getString(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getElementOffset(pos, 8);
+               final long offsetAndSize = SegmentsUtil.getLong(segments, 
fieldOffset);
+               return BinaryString.readBinaryStringFieldFromSegments(
+                               segments, offset, fieldOffset, offsetAndSize);
+       }
+
+       @Override
+       public BinaryArray getArray(int pos) {
+               assertIndexIsValid(pos);
+               return BinaryArray.readBinaryArrayFieldFromSegments(segments, 
offset, getLong(pos));
+       }
+
+       @Override
+       public BinaryMap getMap(int pos) {
+               assertIndexIsValid(pos);
+               return BinaryMap.readBinaryMapFieldFromSegments(segments, 
offset, getLong(pos));
+       }
+
+       @Override
+       public boolean getBoolean(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getBoolean(segments, getElementOffset(pos, 
1));
+       }
+
+       @Override
+       public void setBoolean(int pos, boolean value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setBoolean(segments, getElementOffset(pos, 1), 
value);
+       }
+
+       public void setNullBoolean(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setBoolean(segments, getElementOffset(pos, 1), 
false);
+       }
+
+       @Override
+       public byte getByte(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getByte(segments, getElementOffset(pos, 1));
+       }
+
+       @Override
+       public void setByte(int pos, byte value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setByte(segments, getElementOffset(pos, 1), value);
+       }
+
+       public void setNullByte(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setByte(segments, getElementOffset(pos, 1), (byte) 
0);
+       }
+
+       @Override
+       public short getShort(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getShort(segments, getElementOffset(pos, 
2));
+       }
+
+       @Override
+       public void setShort(int pos, short value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setShort(segments, getElementOffset(pos, 2), 
value);
+       }
+
+       public void setNullShort(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setShort(segments, getElementOffset(pos, 2), 
(short) 0);
+       }
+
+       @Override
+       public float getFloat(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getFloat(segments, getElementOffset(pos, 
4));
+       }
+
+       @Override
+       public void setFloat(int pos, float value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setFloat(segments, getElementOffset(pos, 4), 
value);
+       }
+
+       public void setNullFloat(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setFloat(segments, getElementOffset(pos, 4), 0F);
+       }
+
+       @Override
+       public double getDouble(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getDouble(segments, getElementOffset(pos, 
8));
+       }
+
+       @Override
+       public void setDouble(int pos, double value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setDouble(segments, getElementOffset(pos, 8), 
value);
+       }
+
+       public void setNullDouble(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setDouble(segments, getElementOffset(pos, 8), 0.0);
+       }
+
+       @Override
+       public char getChar(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.getChar(segments, getElementOffset(pos, 2));
+       }
+
+       @Override
+       public void setChar(int pos, char value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               SegmentsUtil.setChar(segments, getElementOffset(pos, 2), value);
+       }
+
+       public void setNullChar(int pos) {
+               assertIndexIsValid(pos);
+               SegmentsUtil.bitSet(segments, offset + 4, pos);
+               SegmentsUtil.setChar(segments, getElementOffset(pos, 2), '\0');
+       }
+
+       public boolean anyNull() {
+               for (int i = offset + 4; i < elementOffset; i += 4) {
+                       if (SegmentsUtil.getInt(segments, i) != 0) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private void checkNoNull() {
+               if (anyNull()) {
+                       throw new RuntimeException("Array can not have null 
value!");
+               }
+       }
+
+       public boolean[] toBooleanArray() {
+               checkNoNull();
+               boolean[] values = new boolean[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
BOOLEAN_ARRAY_OFFSET, numElements);
+               return values;
+       }
+
+       public byte[] toByteArray() {
+               checkNoNull();
+               byte[] values = new byte[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
BYTE_ARRAY_BASE_OFFSET, numElements);
+               return values;
+       }
+
+       public short[] toShortArray() {
+               checkNoNull();
+               short[] values = new short[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
SHORT_ARRAY_OFFSET, numElements * 2);
+               return values;
+       }
+
+       public int[] toIntArray() {
+               checkNoNull();
+               int[] values = new int[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
INT_ARRAY_OFFSET, numElements * 4);
+               return values;
+       }
+
+       public long[] toLongArray() {
+               checkNoNull();
+               long[] values = new long[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
LONG_ARRAY_OFFSET, numElements * 8);
+               return values;
+       }
+
+       public float[] toFloatArray() {
+               checkNoNull();
+               float[] values = new float[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
FLOAT_ARRAY_OFFSET, numElements * 4);
+               return values;
+       }
+
+       public double[] toDoubleArray() {
+               checkNoNull();
+               double[] values = new double[numElements];
+               SegmentsUtil.copyToUnsafe(
+                               segments, elementOffset, values, 
DOUBLE_ARRAY_OFFSET, numElements * 8);
+               return values;
+       }
+
+       private static BinaryArray fromPrimitiveArray(
+                       Object arr, int offset, int length, int elementSize) {
+               final long headerInBytes = calculateHeaderInBytes(length);
+               final long valueRegionInBytes = elementSize * length;
+
+               // must align by 8 bytes
+               long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 
7) / 8;
+               if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
+                       throw new UnsupportedOperationException("Cannot convert 
this array to unsafe format as " +
+                                       "it's too big.");
+               }
+               long totalSize = totalSizeInLongs * 8;
+
+               final byte[] data = new byte[(int) totalSize];
+
+               UNSAFE.putInt(data, (long) BYTE_ARRAY_BASE_OFFSET, length);
+               UNSAFE.copyMemory(
+                               arr, offset, data, BYTE_ARRAY_BASE_OFFSET + 
headerInBytes, valueRegionInBytes);
+
+               BinaryArray result = new BinaryArray();
+               result.pointTo(MemorySegmentFactory.wrap(data), 0, (int) 
totalSize);
+               return result;
+       }
+
+       public static BinaryArray fromPrimitiveArray(boolean[] arr) {
+               return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, 
arr.length, 1);
+       }
+
+       public static BinaryArray fromPrimitiveArray(byte[] arr) {
+               return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, 
arr.length, 1);
+       }
+
+       public static BinaryArray fromPrimitiveArray(short[] arr) {
+               return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 
2);
+       }
+
+       public static BinaryArray fromPrimitiveArray(int[] arr) {
+               return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
+       }
+
+       public static BinaryArray fromPrimitiveArray(long[] arr) {
+               return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 
8);
+       }
+
+       public static BinaryArray fromPrimitiveArray(float[] arr) {
+               return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 
4);
+       }
+
+       public static BinaryArray fromPrimitiveArray(double[] arr) {
+               return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 
8);
+       }
+
+       static BinaryArray readBinaryArrayFieldFromSegments(
+                       MemorySegment[] segments, int baseOffset, long 
offsetAndSize) {
+               final int size = ((int) offsetAndSize);
+               int offset = (int) (offsetAndSize >> 32);
+               BinaryArray array = new BinaryArray();
+               array.pointTo(segments, offset + baseOffset, size);
+               return array;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
new file mode 100644
index 0000000..11bcc46
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * Writer for binary array. See {@link BinaryArray}.
+ */
+public class BinaryArrayWriter extends AbstractBinaryWriter {
+
+       private final int nullBitsSizeInBytes;
+       private final BinaryArray array;
+       private final int numElements;
+       private int fixedSize;
+
+       public BinaryArrayWriter(BinaryArray array, int numElements, int 
elementSize) {
+               this.nullBitsSizeInBytes = 
BinaryArray.calculateHeaderInBytes(numElements);
+               this.fixedSize = roundNumberOfBytesToNearestWord(
+                               nullBitsSizeInBytes + elementSize * 
numElements);
+               this.cursor = fixedSize;
+               this.numElements = numElements;
+
+               this.segment = MemorySegmentFactory.wrap(new byte[fixedSize]);
+               this.segment.putInt(0, numElements);
+               this.array = array;
+       }
+
+       /**
+        * First, reset.
+        */
+       @Override
+       public void reset() {
+               this.cursor = fixedSize;
+               for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+                       segment.putLong(i, 0L);
+               }
+               this.segment.putInt(0, numElements);
+       }
+
+       private void setNullBit(int ordinal) {
+               SegmentsUtil.bitSet(segment, 4, ordinal);
+       }
+
+       public void setNullBoolean(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.putBoolean(getElementOffset(ordinal, 1), false);
+       }
+
+       public void setNullByte(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.put(getElementOffset(ordinal, 1), (byte) 0);
+       }
+
+       public void setNullShort(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.putShort(getElementOffset(ordinal, 2), (short) 0);
+       }
+
+       public void setNullInt(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.putInt(getElementOffset(ordinal, 4), 0);
+       }
+
+       public void setNullLong(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.putLong(getElementOffset(ordinal, 8), (long) 0);
+       }
+
+       public void setNullFloat(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.putFloat(getElementOffset(ordinal, 4), (float) 0);
+       }
+
+       public void setNullDouble(int ordinal) {
+               setNullBit(ordinal);
+               // put zero into the corresponding field when set null
+               segment.putDouble(getElementOffset(ordinal, 8), (double) 0);
+       }
+
+       @Override
+       public void setNullAt(int ordinal) {
+               setNullLong(ordinal);
+       }
+
+       private int getElementOffset(int pos, int elementSize) {
+               return nullBitsSizeInBytes + elementSize * pos;
+       }
+
+       @Override
+       public int getFieldOffset(int pos) {
+               return getElementOffset(pos, 8);
+       }
+
+       @Override
+       public void setOffsetAndSize(int pos, int offset, long size) {
+               final long offsetAndSize = ((long) offset << 32) | size;
+               segment.putLong(getElementOffset(pos, 8), offsetAndSize);
+       }
+
+       @Override
+       public void writeBoolean(int pos, boolean value) {
+               segment.putBoolean(getElementOffset(pos, 1), value);
+       }
+
+       @Override
+       public void writeByte(int pos, byte value) {
+               segment.put(getElementOffset(pos, 1), value);
+       }
+
+       @Override
+       public void writeShort(int pos, short value) {
+               segment.putShort(getElementOffset(pos, 2), value);
+       }
+
+       @Override
+       public void writeInt(int pos, int value) {
+               segment.putInt(getElementOffset(pos, 4), value);
+       }
+
+       @Override
+       public void writeLong(int pos, long value) {
+               segment.putLong(getElementOffset(pos, 8), value);
+       }
+
+       @Override
+       public void writeFloat(int pos, float value) {
+               if (Float.isNaN(value)) {
+                       value = Float.NaN;
+               }
+               segment.putFloat(getElementOffset(pos, 4), value);
+       }
+
+       @Override
+       public void writeDouble(int pos, double value) {
+               if (Double.isNaN(value)) {
+                       value = Double.NaN;
+               }
+               segment.putDouble(getElementOffset(pos, 8), value);
+       }
+
+       @Override
+       public void writeChar(int pos, char value) {
+               segment.putChar(getElementOffset(pos, 2), value);
+       }
+
+       @Override
+       public void afterGrow() {
+               array.pointTo(segment, 0, segment.size());
+       }
+
+       /**
+        * Finally, complete write to set real size to row.
+        */
+       @Override
+       public void complete() {
+               array.pointTo(segment, 0, cursor);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
new file mode 100644
index 0000000..b3ac09d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * Binary format that in {@link MemorySegment}s.
+ */
+public abstract class BinaryFormat<T> {
+
+       protected MemorySegment[] segments;
+       protected int offset;
+       protected int sizeInBytes;
+
+       public BinaryFormat() {}
+
+       public BinaryFormat(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public MemorySegment[] getSegments() {
+               return segments;
+       }
+
+       public int getOffset() {
+               return offset;
+       }
+
+       public int getSizeInBytes() {
+               return sizeInBytes;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o != null &&
+                               getClass() == o.getClass() &&
+                               binaryEquals((BinaryFormat) o);
+       }
+
+       public boolean binaryEquals(BinaryFormat that) {
+               return sizeInBytes == that.sizeInBytes &&
+                               SegmentsUtil.equals(segments, offset, 
that.segments, that.offset, sizeInBytes);
+       }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
new file mode 100644
index 0000000..fefdc27
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ *
+ * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
+ */
+public class BinaryMap extends BinaryFormat {
+
+       private final BinaryArray keys;
+       private final BinaryArray values;
+
+       public BinaryMap() {
+               keys = new BinaryArray();
+               values = new BinaryArray();
+       }
+
+       public int numElements() {
+               return keys.numElements();
+       }
+
+       public void pointTo(MemorySegment segment, int baseOffset, int 
sizeInBytes) {
+               pointTo(new MemorySegment[]{segment}, baseOffset, sizeInBytes);
+       }
+
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               // Read the numBytes of key array from the first 4 bytes.
+               final int keyArrayBytes = SegmentsUtil.getInt(segments, offset);
+               assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + 
") should >= 0";
+               final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
+               assert valueArrayBytes >= 0 : "valueArraySize (" + 
valueArrayBytes + ") should >= 0";
+
+               keys.pointTo(segments, offset + 4, keyArrayBytes);
+               values.pointTo(segments, offset + 4 + keyArrayBytes, 
valueArrayBytes);
+
+               assert keys.numElements() == values.numElements();
+
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public BinaryArray keyArray() {
+               return keys;
+       }
+
+       public BinaryArray valueArray() {
+               return values;
+       }
+
+       public static BinaryMap valueOf(BinaryArray key, BinaryArray value) {
+               checkArgument(key.getSegments().length == 1 && 
value.getSegments().length == 1);
+               byte[] bytes = new byte[4 + key.getSizeInBytes() + 
value.getSizeInBytes()];
+               MemorySegment segment = MemorySegmentFactory.wrap(bytes);
+               segment.putInt(0, key.getSizeInBytes());
+               key.getSegments()[0].copyTo(key.getOffset(), segment, 4, 
key.getSizeInBytes());
+               value.getSegments()[0].copyTo(
+                               value.getOffset(), segment, 4 + 
key.getSizeInBytes(), value.getSizeInBytes());
+               BinaryMap map = new BinaryMap();
+               map.pointTo(segment, 0, bytes.length);
+               return map;
+       }
+
+       public static BinaryMap readBinaryMapFieldFromSegments(
+                       MemorySegment[] segments, int baseOffset, long 
offsetAndSize) {
+               final int size = ((int) offsetAndSize);
+               int offset = (int) (offsetAndSize >> 32);
+               BinaryMap map = new BinaryMap();
+               map.pointTo(segments, offset + baseOffset, size);
+               return map;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
new file mode 100644
index 0000000..472f7b22
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A special row which is backed by {@link MemorySegment} instead of Object. 
It can significantly
+ * reduce the serialization/deserialization of Java objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains 1 byte header and null bit set and field 
values. Null bit set is
+ * used for null tracking and is aligned to 8-byte word boundaries. `Field 
values` holds
+ * fixed-length primitive types and variable-length values which can be stored 
in 8 bytes inside.
+ * If it do not fit the variable-length field, then store the length and 
offset of variable-length
+ * part.
+ *
+ * <p>Fixed-length part will certainly fall into a MemorySegment, which will 
speed up the read
+ * and write of field. During the write phase, if the target memory segment 
has less space than
+ * fixed length part size, we will skip the space. So the number of fields in 
a single Row cannot
+ * exceed the capacity of a single MemorySegment, if there are too many 
fields, we suggest that
+ * user set a bigger pageSize of MemorySegment.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ *
+ * <p>{@code BinaryRow} are influenced by Apache Spark UnsafeRow in project 
tungsten.
+ * The difference is that BinaryRow is placed on a discontinuous memory, and 
the variable length
+ * type can also be placed on a fixed length area (If it's short enough).
+ */
+public final class BinaryRow extends BinaryFormat<Object> implements BaseRow {
+
+       public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN);
+       private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? 0xFFF0 : 
0x0FFF;
+       private static final int HEADER_SIZE_IN_BITS = 8;
+
+       public static int calculateBitSetWidthInBytes(int arity) {
+               return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
+       }
+
+       private final int arity;
+       private final int nullBitsSizeInBytes;
+
+       public BinaryRow(int arity) {
+               checkArgument(arity >= 0);
+               this.arity = arity;
+               this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+       }
+
+       private int getFieldOffset(int pos) {
+               return offset + nullBitsSizeInBytes + pos * 8;
+       }
+
+       private void assertIndexIsValid(int index) {
+               assert index >= 0 : "index (" + index + ") should >= 0";
+               assert index < arity : "index (" + index + ") should < " + 
arity;
+       }
+
+       public int getFixedLengthPartSize() {
+               return nullBitsSizeInBytes + 8 * arity;
+       }
+
+       @Override
+       public int getArity() {
+               return arity;
+       }
+
+       @Override
+       public byte getHeader() {
+               // first nullBitsSizeInBytes byte is header.
+               return segments[0].get(offset);
+       }
+
+       @Override
+       public void setHeader(byte header) {
+               segments[0].put(offset, header);
+       }
+
+       public void pointTo(MemorySegment segment, int offset, int sizeInBytes) 
{
+               this.segments = new MemorySegment[] {segment};
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public void setTotalSize(int sizeInBytes) {
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               assertIndexIsValid(pos);
+               return SegmentsUtil.bitGet(segments[0], offset, pos + 
HEADER_SIZE_IN_BITS);
+       }
+
+       private void setNotNullAt(int i) {
+               assertIndexIsValid(i);
+               SegmentsUtil.bitUnSet(segments[0], offset, i + 
HEADER_SIZE_IN_BITS);
+       }
+
+       @Override
+       public void setNullAt(int i) {
+               assertIndexIsValid(i);
+               SegmentsUtil.bitSet(segments[0], offset, i + 
HEADER_SIZE_IN_BITS);
+               // We must set the fixed length part zero.
+               // 1.Only int/long/boolean...(Fix length type) will invoke this 
setNullAt.
+               // 2.Set to zero in order to equals and hash operation bytes 
calculation.
+               segments[0].putLong(getFieldOffset(i), 0);
+       }
+
+       @Override
+       public void setInt(int pos, int value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putInt(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setLong(int pos, long value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putLong(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setDouble(int pos, double value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putDouble(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setChar(int pos, char value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putChar(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setBoolean(int pos, boolean value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putBoolean(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setShort(int pos, short value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putShort(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setByte(int pos, byte value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].put(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setFloat(int pos, float value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putFloat(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public boolean getBoolean(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getBoolean(getFieldOffset(pos));
+       }
+
+       @Override
+       public byte getByte(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].get(getFieldOffset(pos));
+       }
+
+       @Override
+       public short getShort(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getShort(getFieldOffset(pos));
+       }
+
+       @Override
+       public int getInt(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getInt(getFieldOffset(pos));
+       }
+
+       @Override
+       public long getLong(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getLong(getFieldOffset(pos));
+       }
+
+       @Override
+       public float getFloat(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getFloat(getFieldOffset(pos));
+       }
+
+       @Override
+       public double getDouble(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getDouble(getFieldOffset(pos));
+       }
+
+       @Override
+       public char getChar(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getChar(getFieldOffset(pos));
+       }
+
+       @Override
+       public BinaryString getString(int pos) {
+               assertIndexIsValid(pos);
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndLen = segments[0].getLong(fieldOffset);
+               return BinaryString.readBinaryStringFieldFromSegments(segments, 
offset, fieldOffset, offsetAndLen);
+       }
+
+       @Override
+       public BinaryArray getArray(int pos) {
+               assertIndexIsValid(pos);
+               return BinaryArray.readBinaryArrayFieldFromSegments(segments, 
offset, getLong(pos));
+       }
+
+       @Override
+       public BinaryMap getMap(int pos) {
+               assertIndexIsValid(pos);
+               return BinaryMap.readBinaryMapFieldFromSegments(segments, 
offset, getLong(pos));
+       }
+
+       /**
+        * The bit is 1 when the field is null. Default is 0.
+        */
+       public boolean anyNull() {
+               // Skip the header.
+               if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) {
+                       return true;
+               }
+               for (int i = 8; i < nullBitsSizeInBytes; i += 8) {
+                       if (segments[0].getLong(i) != 0) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public boolean anyNull(int[] fields) {
+               for (int field : fields) {
+                       if (isNullAt(field)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public BinaryRow copy() {
+               return copy(new BinaryRow(arity));
+       }
+
+       public BinaryRow copy(BaseRow reuse) {
+               return copyInternal((BinaryRow) reuse);
+       }
+
+       private BinaryRow copyInternal(BinaryRow reuse) {
+               byte[] bytes = SegmentsUtil.copyToBytes(segments, offset, 
sizeInBytes);
+               reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+               return reuse;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
new file mode 100644
index 0000000..906d073
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * Writer for {@link BinaryRow}.
+ */
+public class BinaryRowWriter extends AbstractBinaryWriter {
+
+       private final int nullBitsSizeInBytes;
+       private final BinaryRow row;
+       private final int fixedSize;
+
+       public BinaryRowWriter(BinaryRow row) {
+               this(row, 0);
+       }
+
+       public BinaryRowWriter(BinaryRow row, int initialSize) {
+               this.nullBitsSizeInBytes = 
BinaryRow.calculateBitSetWidthInBytes(row.getArity());
+               this.fixedSize = row.getFixedLengthPartSize();
+               this.cursor = fixedSize;
+
+               this.segment = MemorySegmentFactory.wrap(new byte[fixedSize + 
initialSize]);
+               this.row = row;
+               this.row.pointTo(segment, 0, segment.size());
+       }
+
+       /**
+        * First, reset.
+        */
+       @Override
+       public void reset() {
+               this.cursor = fixedSize;
+               for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+                       segment.putLong(i, 0L);
+               }
+       }
+
+       /**
+        * Default not null.
+        */
+       @Override
+       public void setNullAt(int pos) {
+               // need add header 8 bit.
+               SegmentsUtil.bitSet(segment, 0, pos + 8);
+               segment.putLong(getFieldOffset(pos), 0L);
+       }
+
+       public void writeHeader(byte header) {
+               segment.put(0, header);
+       }
+
+       @Override
+       public void writeBoolean(int pos, boolean value) {
+               segment.putBoolean(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeByte(int pos, byte value) {
+               segment.put(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeShort(int pos, short value) {
+               segment.putShort(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeInt(int pos, int value) {
+               segment.putInt(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeLong(int pos, long value) {
+               segment.putLong(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeFloat(int pos, float value) {
+               segment.putFloat(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeDouble(int pos, double value) {
+               segment.putDouble(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void writeChar(int pos, char value) {
+               segment.putChar(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void complete() {
+               row.setTotalSize(cursor);
+       }
+
+       @Override
+       public int getFieldOffset(int pos) {
+               return nullBitsSizeInBytes + 8 * pos;
+       }
+
+       @Override
+       public void setOffsetAndSize(int pos, int offset, long size) {
+               final long offsetAndSize = ((long) offset << 32) | size;
+               segment.putLong(getFieldOffset(pos), offsetAndSize);
+       }
+
+       @Override
+       public void afterGrow() {
+               row.pointTo(segment, 0, segment.size());
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
new file mode 100644
index 0000000..4b388f6
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.BinaryStringUtil;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * A utf8 string which is backed by {@link MemorySegment} instead of String. 
Its data may span
+ * multiple {@link MemorySegment}s.
+ *
+ * <p>Used for internal table-level implementation. The built-in operator will 
use it for comparison,
+ * search, and so on.
+ *
+ * <p>{@code BinaryString} are influenced by Apache Spark UTF8String.
+ */
+public class BinaryString extends BinaryFormat<String> {
+
+       private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE;
+       private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
+
+       public BinaryString(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               super(segments, offset, sizeInBytes);
+       }
+
+       public static BinaryString fromString(String str) {
+               if (str == null) {
+                       return null;
+               } else {
+                       byte[] bytes = str.getBytes();
+                       return new BinaryString(new MemorySegment[] 
{MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+               }
+       }
+
+       @Override
+       public String toString() {
+               byte[] bytes = BinaryStringUtil.allocateReuseBytes(sizeInBytes);
+               SegmentsUtil.copyToBytes(segments, offset, bytes, 0, 
sizeInBytes);
+               return new String(bytes, 0, sizeInBytes);
+       }
+
+       /**
+        * Get binary string, if len less than 8, will be include in 
variablePartOffsetAndLen.
+        *
+        * <p>If len is less than 8, its binary format is:
+        * 1bit mark(1) = 1, 7bits len, and 7bytes data.
+        *
+        * <p>If len is greater or equal to 8, its binary format is:
+        * 1bit mark(1) = 0, 31bits offset, and 4bytes len.
+        * Data is stored in variable-length part.
+        *
+        * <p>Note: Need to consider the ByteOrder.
+        *
+        * @param baseOffset base offset of composite binary format.
+        * @param fieldOffset absolute start offset of 
'variablePartOffsetAndLen'.
+        * @param variablePartOffsetAndLen a long value, real data or offset 
and len.
+        */
+       static BinaryString readBinaryStringFieldFromSegments(
+                       MemorySegment[] segments, int baseOffset, int 
fieldOffset,
+                       long variablePartOffsetAndLen) {
+               long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+               if (mark == 0) {
+                       final int subOffset = (int) (variablePartOffsetAndLen 
>> 32);
+                       final int len = (int) variablePartOffsetAndLen;
+                       return new BinaryString(segments, baseOffset + 
subOffset, len);
+               } else {
+                       int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+                       if (SegmentsUtil.LITTLE_ENDIAN) {
+                               return new BinaryString(segments, fieldOffset, 
len);
+                       } else {
+                               // fieldOffset + 1 to skip header.
+                               return new BinaryString(segments, fieldOffset + 
1, len);
+                       }
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
new file mode 100644
index 0000000..435b064
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Writer to write a composite data format, like row, array.
+ * 1. Invoke {@link #reset()}.
+ * 2. Write each field by writeXX or setNullAt. (Same field can not be written 
repeatedly.)
+ * 3. Invoke {@link #complete()}.
+ */
+public interface BinaryWriter {
+
+       /**
+        * Reset writer to prepare next write.
+        */
+       void reset();
+
+       /**
+        * Set null to this field.
+        */
+       void setNullAt(int pos);
+
+       void writeBoolean(int pos, boolean value);
+
+       void writeByte(int pos, byte value);
+
+       void writeShort(int pos, short value);
+
+       void writeInt(int pos, int value);
+
+       void writeLong(int pos, long value);
+
+       void writeFloat(int pos, float value);
+
+       void writeDouble(int pos, double value);
+
+       void writeChar(int pos, char value);
+
+       void writeString(int pos, BinaryString value);
+
+       void writeArray(int pos, BinaryArray value);
+
+       void writeMap(int pos, BinaryMap value);
+
+       /**
+        * Finally, complete write to set real size to binary.
+        */
+       void complete();
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
new file mode 100644
index 0000000..f0484f7
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * A GenericRow can have arbitrary number of fields and contain a set of 
fields, which may all be
+ * different types. The fields in GenericRow can be null.
+ *
+ * <p>The fields in the Row can be accessed by position (zero-based) {@link 
#getInt}.
+ * And can update fields by {@link #setField(int, Object)}.
+ *
+ * <p>GenericRow is in principle serializable. However, it may contain 
non-serializable fields,
+ * in which case serialization will fail.
+ */
+public final class GenericRow extends ObjectArrayRow {
+
+       public GenericRow(int arity) {
+               super(arity);
+       }
+
+       @Override
+       public boolean getBoolean(int ordinal) {
+               return (boolean) this.fields[ordinal];
+       }
+
+       @Override
+       public byte getByte(int ordinal) {
+               return (byte) this.fields[ordinal];
+       }
+
+       @Override
+       public short getShort(int ordinal) {
+               return (short) this.fields[ordinal];
+       }
+
+       @Override
+       public int getInt(int ordinal) {
+               return (int) this.fields[ordinal];
+       }
+
+       @Override
+       public long getLong(int ordinal) {
+               return (long) this.fields[ordinal];
+       }
+
+       @Override
+       public float getFloat(int ordinal) {
+               return (float) this.fields[ordinal];
+       }
+
+       @Override
+       public double getDouble(int ordinal) {
+               return (double) this.fields[ordinal];
+       }
+
+       @Override
+       public char getChar(int ordinal) {
+               return (char) this.fields[ordinal];
+       }
+
+       @Override
+       public void setBoolean(int ordinal, boolean value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setByte(int ordinal, byte value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setShort(int ordinal, short value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setInt(int ordinal, int value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setLong(int ordinal, long value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setFloat(int ordinal, float value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setDouble(int ordinal, double value) {
+               this.fields[ordinal] = value;
+       }
+
+       @Override
+       public void setChar(int ordinal, char value) {
+               this.fields[ordinal] = value;
+       }
+
+       public void setField(int ordinal, Object value) {
+               this.fields[ordinal] = value;
+       }
+
+       public static GenericRow of(Object... values) {
+               GenericRow row = new GenericRow(values.length);
+
+               for (int i = 0; i < values.length; ++i) {
+                       row.setField(i, values[i]);
+               }
+
+               return row;
+       }
+}
+
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
new file mode 100644
index 0000000..d5d47b0
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+
+/**
+ * An row implementation that uses an array of objects as the underlying 
storage.
+ */
+public abstract class ObjectArrayRow implements BaseRow {
+
+       private byte header;
+
+       protected final Object[] fields;
+
+       public ObjectArrayRow(int arity) {
+               this.fields = new Object[arity];
+       }
+
+       @Override
+       public int getArity() {
+               return fields.length;
+       }
+
+       @Override
+       public byte getHeader() {
+               return header;
+       }
+
+       @Override
+       public void setHeader(byte header) {
+               this.header = header;
+       }
+
+       @Override
+       public boolean isNullAt(int ordinal) {
+               return this.fields[ordinal] == null;
+       }
+
+       @Override
+       public void setNullAt(int ordinal) {
+               this.fields[ordinal] = null;
+       }
+
+       @Override
+       public BinaryString getString(int ordinal) {
+               return (BinaryString) this.fields[ordinal];
+       }
+
+       @Override
+       public BinaryArray getArray(int ordinal) {
+               return (BinaryArray) this.fields[ordinal];
+       }
+
+       @Override
+       public BinaryMap getMap(int ordinal) {
+               return (BinaryMap) this.fields[ordinal];
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(getHeader()).append("|");
+               for (int i = 0; i < fields.length; i++) {
+                       if (i != 0) {
+                               sb.append(",");
+                       }
+                       sb.append(StringUtils.arrayAwareToString(fields[i]));
+               }
+               return sb.toString();
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * Byte.hashCode(getHeader()) + 
Arrays.hashCode(fields);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (o != null && o instanceof ObjectArrayRow) {
+                       ObjectArrayRow other = (ObjectArrayRow) o;
+                       return header == other.header && Arrays.equals(fields, 
other.fields);
+               } else {
+                       return false;
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
new file mode 100644
index 0000000..e567add
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Provide type specialized getters and setters to reduce if/else and 
eliminate box and unbox.
+ *
+ * <p>There is only setter for the fixed-length type field, because the 
variable-length type
+ * cannot be set to the binary format such as {@link BinaryFormat}.</p>
+ *
+ * <p>All the {@code getXxx(int)} methods do not guarantee new object returned 
when every
+ * time called.</p>
+ */
+public interface TypeGetterSetters {
+
+       /**
+        * Because the specific row implementation such as BinaryRow uses the 
binary format. We must
+        * first determine if it is null, and then make a specific get.
+        *
+        * @return true if this field is null.
+        */
+       boolean isNullAt(int ordinal);
+
+       /**
+        * Set null to this field.
+        */
+       void setNullAt(int ordinal);
+
+       /**
+        * Get boolean value.
+        */
+       boolean getBoolean(int ordinal);
+
+       /**
+        * Get byte value.
+        */
+       byte getByte(int ordinal);
+
+       /**
+        * Get short value.
+        */
+       short getShort(int ordinal);
+
+       /**
+        * Get int value.
+        */
+       int getInt(int ordinal);
+
+       /**
+        * Get long value.
+        */
+       long getLong(int ordinal);
+
+       /**
+        * Get float value.
+        */
+       float getFloat(int ordinal);
+
+       /**
+        * Get double value.
+        */
+       double getDouble(int ordinal);
+
+       /**
+        * Get char value.
+        */
+       char getChar(int ordinal);
+
+       /**
+        * Get string value, internal format is BinaryString.
+        */
+       BinaryString getString(int ordinal);
+
+       /**
+        * Get array value, internal format is BinaryArray.
+        */
+       BinaryArray getArray(int ordinal);
+
+       /**
+        * Get map value, internal format is BinaryMap.
+        */
+       BinaryMap getMap(int ordinal);
+
+       /**
+        * Set boolean value.
+        */
+       void setBoolean(int ordinal, boolean value);
+
+       /**
+        * Set byte value.
+        */
+       void setByte(int ordinal, byte value);
+
+       /**
+        * Set short value.
+        */
+       void setShort(int ordinal, short value);
+
+       /**
+        * Set int value.
+        */
+       void setInt(int ordinal, int value);
+
+       /**
+        * Set long value.
+        */
+       void setLong(int ordinal, long value);
+
+       /**
+        * Set float value.
+        */
+       void setFloat(int ordinal, float value);
+
+       /**
+        * Set double value.
+        */
+       void setDouble(int ordinal, double value);
+
+       /**
+        * Set char value.
+        */
+       void setChar(int ordinal, char value);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/BinaryStringUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/BinaryStringUtil.java
new file mode 100644
index 0000000..78f660d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/BinaryStringUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.table.dataformat.BinaryString;
+
+/**
+ * Util for {@link BinaryString}.
+ */
+public class BinaryStringUtil {
+
+       /**
+        * SQL execution threads is limited, not too many, so it can bear the 
overhead of 64K per thread.
+        */
+       private static final int MAX_BYTES_LENGTH = 1024 * 64;
+       private static final ThreadLocal<byte[]> BYTES_LOCAL = new 
ThreadLocal<>();
+
+       /**
+        * Allocate bytes that is only for temporary usage, it should not be 
stored in somewhere else.
+        * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] 
new and gc.
+        *
+        * <p>If there are methods that can only accept a byte[], instead of a 
MemorySegment[]
+        * parameter, we can allocate a reuse bytes and copy the MemorySegment 
data to byte[],
+        * then call the method. Such as String deserialization.
+        */
+       public static byte[] allocateReuseBytes(int length) {
+               byte[] bytes = BYTES_LOCAL.get();
+
+               if (bytes == null) {
+                       if (length <= MAX_BYTES_LENGTH) {
+                               bytes = new byte[MAX_BYTES_LENGTH];
+                               BYTES_LOCAL.set(bytes);
+                       } else {
+                               bytes = new byte[length];
+                       }
+               } else if (bytes.length < length) {
+                       bytes = new byte[length];
+               }
+
+               return bytes;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
new file mode 100644
index 0000000..4ebb401
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.nio.ByteOrder;
+
+/**
+ * Util for data format segments calc.
+ */
+public class SegmentsUtil {
+
+       /**
+        * Constant that flags the byte order.
+        */
+       public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN;
+
+       private static final int BIT_BYTE_POSITION_MASK = 0xfffffff8;
+
+       private static final int BIT_BYTE_INDEX_MASK = 7;
+
+       /**
+        * Copy segments to a new byte[].
+        *
+        * @param segments Source segments.
+        * @param offset Source segments offset.
+        * @param numBytes the number bytes to copy.
+        */
+       public static byte[] copyToBytes(MemorySegment[] segments, int offset, 
int numBytes) {
+               return copyToBytes(segments, offset, new byte[numBytes], 0, 
numBytes);
+       }
+
+       /**
+        * Copy segments to target byte[].
+        *
+        * @param segments Source segments.
+        * @param offset Source segments offset.
+        * @param bytes target byte[].
+        * @param bytesOffset target byte[] offset.
+        * @param numBytes the number bytes to copy.
+        */
+       public static byte[] copyToBytes(MemorySegment[] segments, int offset, 
byte[] bytes,
+                       int bytesOffset, int numBytes) {
+               if (inFirstSegment(segments, offset, numBytes)) {
+                       segments[0].get(offset, bytes, bytesOffset, numBytes);
+               } else {
+                       copyMultiSegmentsToBytes(segments, offset, bytes, 
bytesOffset, numBytes);
+               }
+               return bytes;
+       }
+
+       private static void copyMultiSegmentsToBytes(MemorySegment[] segments, 
int offset, byte[] bytes,
+                       int bytesOffset, int numBytes) {
+               int remainSize = numBytes;
+               for (MemorySegment segment : segments) {
+                       int remain = segment.size() - offset;
+                       if (remain > 0) {
+                               int nCopy = Math.min(remain, remainSize);
+                               segment.get(offset, bytes, numBytes - 
remainSize + bytesOffset, nCopy);
+                               remainSize -= nCopy;
+                               // next new segment.
+                               offset = 0;
+                               if (remainSize == 0) {
+                                       return;
+                               }
+                       } else {
+                               // remain is negative, let's advance to next 
segment
+                               // now the offset = offset - segmentSize 
(-remain)
+                               offset = -remain;
+                       }
+               }
+       }
+
+       public static void copyToUnsafe(
+                       MemorySegment[] segments, int offset,
+                       Object target, int pointer, int numBytes) {
+               if (inFirstSegment(segments, offset, numBytes)) {
+                       segments[0].copyToUnsafe(offset, target, pointer, 
numBytes);
+               } else {
+                       copyMultiSegmentsToUnsafe(segments, offset, target, 
pointer, numBytes);
+               }
+       }
+
+       private static void copyMultiSegmentsToUnsafe(
+                       MemorySegment[] segments, int offset,
+                       Object target, int pointer, int numBytes) {
+               int remainSize = numBytes;
+               for (MemorySegment segment : segments) {
+                       int remain = segment.size() - offset;
+                       if (remain > 0) {
+                               int nCopy = Math.min(remain, remainSize);
+                               segment.copyToUnsafe(offset, target, numBytes - 
remainSize + pointer, nCopy);
+                               remainSize -= nCopy;
+                               // next new segment.
+                               offset = 0;
+                               if (remainSize == 0) {
+                                       return;
+                               }
+                       } else {
+                               // remain is negative, let's advance to next 
segment
+                               // now the offset = offset - segmentSize 
(-remain)
+                               offset = -remain;
+                       }
+               }
+       }
+
+       /**
+        * Equals two memory segments regions.
+        *
+        * @param segments1 Segments 1
+        * @param offset1 Offset of segments1 to start equaling
+        * @param segments2 Segments 2
+        * @param offset2 Offset of segments2 to start equaling
+        * @param len Length of the equaled memory region
+        *
+        * @return true if equal, false otherwise
+        */
+       public static boolean equals(
+                       MemorySegment[] segments1, int offset1,
+                       MemorySegment[] segments2, int offset2, int len) {
+               if (inFirstSegment(segments1, offset1, len) && 
inFirstSegment(segments2, offset2, len)) {
+                       return segments1[0].equalTo(segments2[0], offset1, 
offset2, len);
+               } else {
+                       return equalsMultiSegments(segments1, offset1, 
segments2, offset2, len);
+               }
+       }
+
+       @VisibleForTesting
+       static boolean equalsMultiSegments(
+                       MemorySegment[] segments1, int offset1,
+                       MemorySegment[] segments2, int offset2, int len) {
+               if (len == 0) {
+                       // quick way and avoid segSize is zero.
+                       return true;
+               }
+
+               int segSize1 = segments1[0].size();
+               int segSize2 = segments2[0].size();
+
+               // find first segIndex and segOffset of segments.
+               int segIndex1 = offset1 / segSize1;
+               int segIndex2 = offset2 / segSize2;
+               int segOffset1 = offset1 - segSize1 * segIndex1; // equal to %
+               int segOffset2 = offset2 - segSize2 * segIndex2; // equal to %
+
+               while (len > 0) {
+                       int equalLen = Math.min(Math.min(len, segSize1 - 
segOffset1), segSize2 - segOffset2);
+                       if (!segments1[segIndex1].equalTo(segments2[segIndex2], 
segOffset1, segOffset2, equalLen)) {
+                               return false;
+                       }
+                       len -= equalLen;
+                       segOffset1 += equalLen;
+                       if (segOffset1 == segSize1) {
+                               segOffset1 = 0;
+                               segIndex1++;
+                       }
+                       segOffset2 += equalLen;
+                       if (segOffset2 == segSize2) {
+                               segOffset2 = 0;
+                               segIndex2++;
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * Is it just in first MemorySegment, we use quick way to do something.
+        */
+       private static boolean inFirstSegment(MemorySegment[] segments, int 
offset, int numBytes) {
+               return numBytes + offset <= segments[0].size();
+       }
+
+       /**
+        * unset bit.
+        *
+        * @param segment target segment.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitUnSet(MemorySegment segment, int baseOffset, int 
index) {
+               int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+               byte current = segment.get(offset);
+               current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(offset, current);
+       }
+
+       /**
+        * set bit.
+        *
+        * @param segment target segment.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitSet(MemorySegment segment, int baseOffset, int 
index) {
+               int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+               byte current = segment.get(offset);
+               current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(offset, current);
+       }
+
+       /**
+        * read bit.
+        *
+        * @param segment target segment.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static boolean bitGet(MemorySegment segment, int baseOffset, int 
index) {
+               int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+               byte current = segment.get(offset);
+               return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+       }
+
+       /**
+        * unset bit from segments.
+        *
+        * @param segments target segments.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitUnSet(MemorySegment[] segments, int baseOffset, 
int index) {
+               if (segments.length == 1) {
+                       MemorySegment segment = segments[0];
+                       int offset = baseOffset + ((index & 
BIT_BYTE_POSITION_MASK) >>> 3);
+                       byte current = segment.get(offset);
+                       current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+                       segment.put(offset, current);
+               } else {
+                       bitUnSetMultiSegments(segments, baseOffset, index);
+               }
+       }
+
+       private static void bitUnSetMultiSegments(MemorySegment[] segments, int 
baseOffset, int index) {
+               int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               MemorySegment segment = segments[segIndex];
+
+               byte current = segment.get(segOffset);
+               current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(segOffset, current);
+       }
+
+       /**
+        * set bit from segments.
+        *
+        * @param segments target segments.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static void bitSet(MemorySegment[] segments, int baseOffset, int 
index) {
+               if (segments.length == 1) {
+                       int offset = baseOffset + ((index & 
BIT_BYTE_POSITION_MASK) >>> 3);
+                       MemorySegment segment = segments[0];
+                       byte current = segment.get(offset);
+                       current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+                       segment.put(offset, current);
+               } else {
+                       bitSetMultiSegments(segments, baseOffset, index);
+               }
+       }
+
+       private static void bitSetMultiSegments(MemorySegment[] segments, int 
baseOffset, int index) {
+               int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               MemorySegment segment = segments[segIndex];
+
+               byte current = segment.get(segOffset);
+               current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+               segment.put(segOffset, current);
+       }
+
+       /**
+        * read bit from segments.
+        *
+        * @param segments target segments.
+        * @param baseOffset bits base offset.
+        * @param index bit index from base offset.
+        */
+       public static boolean bitGet(MemorySegment[] segments, int baseOffset, 
int index) {
+               int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+               byte current = getByte(segments, offset);
+               return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+       }
+
+       /**
+        * get boolean from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static boolean getBoolean(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       return segments[0].getBoolean(offset);
+               } else {
+                       return getBooleanMultiSegments(segments, offset);
+               }
+       }
+
+       private static boolean getBooleanMultiSegments(MemorySegment[] 
segments, int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               return segments[segIndex].getBoolean(segOffset);
+       }
+
+       /**
+        * set boolean from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setBoolean(MemorySegment[] segments, int offset, 
boolean value) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       segments[0].putBoolean(offset, value);
+               } else {
+                       setBooleanMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setBooleanMultiSegments(MemorySegment[] segments, 
int offset, boolean value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               segments[segIndex].putBoolean(segOffset, value);
+       }
+
+       /**
+        * get byte from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static byte getByte(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       return segments[0].get(offset);
+               } else {
+                       return getByteMultiSegments(segments, offset);
+               }
+       }
+
+       private static byte getByteMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               return segments[segIndex].get(segOffset);
+       }
+
+       /**
+        * set byte from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setByte(MemorySegment[] segments, int offset, byte 
value) {
+               if (inFirstSegment(segments, offset, 1)) {
+                       segments[0].put(offset, value);
+               } else {
+                       setByteMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setByteMultiSegments(MemorySegment[] segments, int 
offset, byte value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+               segments[segIndex].put(segOffset, value);
+       }
+
+       /**
+        * get int from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static int getInt(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       return segments[0].getInt(offset);
+               } else {
+                       return getIntMultiSegments(segments, offset);
+               }
+       }
+
+       private static int getIntMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       return segments[segIndex].getInt(segOffset);
+               } else {
+                       return getIntSlowly(segments, segSize, segIndex, 
segOffset);
+               }
+       }
+
+       private static int getIntSlowly(
+                       MemorySegment[] segments, int segSize, int segNum, int 
segOffset) {
+               MemorySegment segment = segments[segNum];
+               int ret = 0;
+               for (int i = 0; i < 4; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       int unsignedByte = segment.get(segOffset) & 0xff;
+                       if (LITTLE_ENDIAN) {
+                               ret |= (unsignedByte << (i * 8));
+                       } else {
+                               ret |= (unsignedByte << ((3 - i) * 8));
+                       }
+                       segOffset++;
+               }
+               return ret;
+       }
+
+       /**
+        * set int from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setInt(MemorySegment[] segments, int offset, int 
value) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       segments[0].putInt(offset, value);
+               } else {
+                       setIntMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setIntMultiSegments(MemorySegment[] segments, int 
offset, int value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       segments[segIndex].putInt(segOffset, value);
+               } else {
+                       setIntSlowly(segments, segSize, segIndex, segOffset, 
value);
+               }
+       }
+
+       private static void setIntSlowly(
+                       MemorySegment[] segments, int segSize, int segNum, int 
segOffset, int value) {
+               MemorySegment segment = segments[segNum];
+               for (int i = 0; i < 4; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       int unsignedByte;
+                       if (LITTLE_ENDIAN) {
+                               unsignedByte = value >> (i * 8);
+                       } else {
+                               unsignedByte = value >> ((3 - i) * 8);
+                       }
+                       segment.put(segOffset, (byte) unsignedByte);
+                       segOffset++;
+               }
+       }
+
+       /**
+        * get long from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static long getLong(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       return segments[0].getLong(offset);
+               } else {
+                       return getLongMultiSegments(segments, offset);
+               }
+       }
+
+       private static long getLongMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       return segments[segIndex].getLong(segOffset);
+               } else {
+                       return getLongSlowly(segments, segSize, segIndex, 
segOffset);
+               }
+       }
+
+       private static long getLongSlowly(
+                       MemorySegment[] segments, int segSize, int segNum, int 
segOffset) {
+               MemorySegment segment = segments[segNum];
+               long ret = 0;
+               for (int i = 0; i < 8; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       long unsignedByte = segment.get(segOffset) & 0xff;
+                       if (LITTLE_ENDIAN) {
+                               ret |= (unsignedByte << (i * 8));
+                       } else {
+                               ret |= (unsignedByte << ((7 - i) * 8));
+                       }
+                       segOffset++;
+               }
+               return ret;
+       }
+
+       /**
+        * set long from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setLong(MemorySegment[] segments, int offset, long 
value) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       segments[0].putLong(offset, value);
+               } else {
+                       setLongMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setLongMultiSegments(MemorySegment[] segments, int 
offset, long value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       segments[segIndex].putLong(segOffset, value);
+               } else {
+                       setLongSlowly(segments, segSize, segIndex, segOffset, 
value);
+               }
+       }
+
+       private static void setLongSlowly(
+                       MemorySegment[] segments, int segSize, int segNum, int 
segOffset, long value) {
+               MemorySegment segment = segments[segNum];
+               for (int i = 0; i < 8; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       long unsignedByte;
+                       if (LITTLE_ENDIAN) {
+                               unsignedByte = value >> (i * 8);
+                       } else {
+                               unsignedByte = value >> ((7 - i) * 8);
+                       }
+                       segment.put(segOffset, (byte) unsignedByte);
+                       segOffset++;
+               }
+       }
+
+       /**
+        * get short from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static short getShort(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 2)) {
+                       return segments[0].getShort(offset);
+               } else {
+                       return getShortMultiSegments(segments, offset);
+               }
+       }
+
+       private static short getShortMultiSegments(MemorySegment[] segments, 
int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 1) {
+                       return segments[segIndex].getShort(segOffset);
+               } else {
+                       return (short) getTwoByteSlowly(segments, segSize, 
segIndex, segOffset);
+               }
+       }
+
+       /**
+        * set short from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setShort(MemorySegment[] segments, int offset, short 
value) {
+               if (inFirstSegment(segments, offset, 2)) {
+                       segments[0].putShort(offset, value);
+               } else {
+                       setShortMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setShortMultiSegments(MemorySegment[] segments, int 
offset, short value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 1) {
+                       segments[segIndex].putShort(segOffset, value);
+               } else {
+                       setTwoByteSlowly(segments, segSize, segIndex, 
segOffset, value, value >> 8);
+               }
+       }
+
+       /**
+        * get float from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static float getFloat(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       return segments[0].getFloat(offset);
+               } else {
+                       return getFloatMultiSegments(segments, offset);
+               }
+       }
+
+       private static float getFloatMultiSegments(MemorySegment[] segments, 
int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       return segments[segIndex].getFloat(segOffset);
+               } else {
+                       return Float.intBitsToFloat(getIntSlowly(segments, 
segSize, segIndex, segOffset));
+               }
+       }
+
+       /**
+        * set float from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setFloat(MemorySegment[] segments, int offset, float 
value) {
+               if (inFirstSegment(segments, offset, 4)) {
+                       segments[0].putFloat(offset, value);
+               } else {
+                       setFloatMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setFloatMultiSegments(MemorySegment[] segments, int 
offset, float value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       segments[segIndex].putFloat(segOffset, value);
+               } else {
+                       setIntSlowly(segments, segSize, segIndex, segOffset, 
Float.floatToRawIntBits(value));
+               }
+       }
+
+       /**
+        * get double from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static double getDouble(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       return segments[0].getDouble(offset);
+               } else {
+                       return getDoubleMultiSegments(segments, offset);
+               }
+       }
+
+       private static double getDoubleMultiSegments(MemorySegment[] segments, 
int offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       return segments[segIndex].getDouble(segOffset);
+               } else {
+                       return Double.longBitsToDouble(getLongSlowly(segments, 
segSize, segIndex, segOffset));
+               }
+       }
+
+       /**
+        * set double from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setDouble(MemorySegment[] segments, int offset, 
double value) {
+               if (inFirstSegment(segments, offset, 8)) {
+                       segments[0].putDouble(offset, value);
+               } else {
+                       setDoubleMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setDoubleMultiSegments(MemorySegment[] segments, 
int offset, double value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 7) {
+                       segments[segIndex].putDouble(segOffset, value);
+               } else {
+                       setLongSlowly(segments, segSize, segIndex, segOffset, 
Double.doubleToRawLongBits(value));
+               }
+       }
+
+       /**
+        * get char from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static char getChar(MemorySegment[] segments, int offset) {
+               if (inFirstSegment(segments, offset, 2)) {
+                       return segments[0].getChar(offset);
+               } else {
+                       return getCharMultiSegments(segments, offset);
+               }
+       }
+
+       private static char getCharMultiSegments(MemorySegment[] segments, int 
offset) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 1) {
+                       return segments[segIndex].getChar(segOffset);
+               } else {
+                       return (char) getTwoByteSlowly(segments, segSize, 
segIndex, segOffset);
+               }
+       }
+
+       private static int getTwoByteSlowly(
+                       MemorySegment[] segments, int segSize, int segNum, int 
segOffset) {
+               MemorySegment segment = segments[segNum];
+               int ret = 0;
+               for (int i = 0; i < 2; i++) {
+                       if (segOffset == segSize) {
+                               segment = segments[++segNum];
+                               segOffset = 0;
+                       }
+                       int unsignedByte = segment.get(segOffset) & 0xff;
+                       if (LITTLE_ENDIAN) {
+                               ret |= (unsignedByte << (i * 8));
+                       } else {
+                               ret |= (unsignedByte << ((1 - i) * 8));
+                       }
+                       segOffset++;
+               }
+               return ret;
+       }
+
+       /**
+        * set char from segments.
+        *
+        * @param segments target segments.
+        * @param offset value offset.
+        */
+       public static void setChar(MemorySegment[] segments, int offset, char 
value) {
+               if (inFirstSegment(segments, offset, 2)) {
+                       segments[0].putChar(offset, value);
+               } else {
+                       setCharMultiSegments(segments, offset, value);
+               }
+       }
+
+       private static void setCharMultiSegments(MemorySegment[] segments, int 
offset, char value) {
+               int segSize = segments[0].size();
+               int segIndex = offset / segSize;
+               int segOffset = offset - segIndex * segSize; // equal to %
+
+               if (segOffset < segSize - 3) {
+                       segments[segIndex].putChar(segOffset, value);
+               } else {
+                       setTwoByteSlowly(segments, segSize, segIndex, 
segOffset, value, value >> 8);
+               }
+       }
+
+       private static void setTwoByteSlowly(
+                       MemorySegment[] segments, int segSize, int segNum, int 
segOffset, int b1, int b2) {
+               MemorySegment segment = segments[segNum];
+               segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b1 : b2));
+               segOffset++;
+               if (segOffset == segSize) {
+                       segment = segments[++segNum];
+                       segOffset = 0;
+               }
+               segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b2 : b1));
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
new file mode 100644
index 0000000..c90270b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of {@link BinaryArray} and {@link BinaryArrayWriter}.
+ */
+public class BinaryArrayTest {
+
+       @Test
+       public void testArray() {
+               // 1.array test
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
+
+               writer.writeInt(0, 6);
+               writer.setNullInt(1);
+               writer.writeInt(2, 666);
+               writer.complete();
+
+               assertEquals(array.getInt(0), 6);
+               assertTrue(array.isNullAt(1));
+               assertEquals(array.getInt(2), 666);
+
+               //2.test write to binary row.
+               {
+                       BinaryRow row2 = new BinaryRow(1);
+                       BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+                       writer2.writeArray(0, array);
+                       writer2.complete();
+
+                       BinaryArray array2 = row2.getArray(0);
+                       assertEquals(array2, array);
+                       assertEquals(array2.getInt(0), 6);
+                       assertTrue(array2.isNullAt(1));
+                       assertEquals(array2.getInt(2), 666);
+               }
+
+               //3.test write var seg array to binary row.
+               {
+                       BinaryArray array3 = splitArray(array);
+
+                       BinaryRow row2 = new BinaryRow(1);
+                       BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+                       writer2.writeArray(0, array3);
+                       writer2.complete();
+
+                       BinaryArray array2 = row2.getArray(0);
+                       assertEquals(array2, array);
+                       assertEquals(array2.getInt(0), 6);
+                       assertTrue(array2.isNullAt(1));
+                       assertEquals(array2.getInt(2), 666);
+               }
+       }
+
+       @Test
+       public void testArrayTypes() {
+               {
+                       // test bool
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 1);
+                       writer.setNullBoolean(0);
+                       writer.writeBoolean(1, true);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(true, array.getBoolean(1));
+                       array.setBoolean(0, true);
+                       assertEquals(true, array.getBoolean(0));
+                       array.setNullBoolean(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(true, newArray.getBoolean(1));
+                       newArray.setBoolean(0, true);
+                       assertEquals(true, newArray.getBoolean(0));
+                       newArray.setNullBoolean(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setBoolean(0, true);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toBooleanArray()));
+               }
+
+               {
+                       // test byte
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 1);
+                       writer.setNullByte(0);
+                       writer.writeByte(1, (byte) 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(25, array.getByte(1));
+                       array.setByte(0, (byte) 5);
+                       assertEquals(5, array.getByte(0));
+                       array.setNullByte(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(25, newArray.getByte(1));
+                       newArray.setByte(0, (byte) 5);
+                       assertEquals(5, newArray.getByte(0));
+                       newArray.setNullByte(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setByte(0, (byte) 3);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toByteArray()));
+               }
+
+               {
+                       // test short
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 2);
+                       writer.setNullShort(0);
+                       writer.writeShort(1, (short) 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(25, array.getShort(1));
+                       array.setShort(0, (short) 5);
+                       assertEquals(5, array.getShort(0));
+                       array.setNullShort(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(25, newArray.getShort(1));
+                       newArray.setShort(0, (short) 5);
+                       assertEquals(5, newArray.getShort(0));
+                       newArray.setNullShort(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setShort(0, (short) 3);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toShortArray()));
+               }
+
+               {
+                       // test int
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 4);
+                       writer.setNullInt(0);
+                       writer.writeInt(1, 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(25, array.getInt(1));
+                       array.setInt(0, 5);
+                       assertEquals(5, array.getInt(0));
+                       array.setNullInt(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(25, newArray.getInt(1));
+                       newArray.setInt(0, 5);
+                       assertEquals(5, newArray.getInt(0));
+                       newArray.setNullInt(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setInt(0, 3);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toIntArray()));
+               }
+
+               {
+                       // test long
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 8);
+                       writer.setNullLong(0);
+                       writer.writeLong(1, 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(25, array.getLong(1));
+                       array.setLong(0, 5);
+                       assertEquals(5, array.getLong(0));
+                       array.setNullLong(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(25, newArray.getLong(1));
+                       newArray.setLong(0, 5);
+                       assertEquals(5, newArray.getLong(0));
+                       newArray.setNullLong(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setLong(0, 3);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toLongArray()));
+               }
+
+               {
+                       // test float
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 4);
+                       writer.setNullFloat(0);
+                       writer.writeFloat(1, 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertTrue(25 == array.getFloat(1));
+                       array.setFloat(0, 5);
+                       assertTrue(5 == array.getFloat(0));
+                       array.setNullFloat(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertTrue(25 == newArray.getFloat(1));
+                       newArray.setFloat(0, 5);
+                       assertTrue(5 == newArray.getFloat(0));
+                       newArray.setNullFloat(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setFloat(0, 3);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toFloatArray()));
+               }
+
+               {
+                       // test double
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 8);
+                       writer.setNullDouble(0);
+                       writer.writeDouble(1, 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertTrue(25 == array.getDouble(1));
+                       array.setDouble(0, 5);
+                       assertTrue(5 == array.getDouble(0));
+                       array.setNullDouble(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertTrue(25 == newArray.getDouble(1));
+                       newArray.setDouble(0, 5);
+                       assertTrue(5 == newArray.getDouble(0));
+                       newArray.setNullDouble(0);
+                       assertTrue(newArray.isNullAt(0));
+
+                       newArray.setDouble(0, 3);
+                       assertEquals(newArray, 
BinaryArray.fromPrimitiveArray(newArray.toDoubleArray()));
+               }
+
+               {
+                       // test char
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 2);
+                       writer.setNullShort(0);
+                       writer.writeChar(1, (char) 25);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(25, array.getChar(1));
+                       array.setChar(0, (char) 5);
+                       assertEquals(5, array.getChar(0));
+                       array.setNullChar(0);
+                       assertTrue(array.isNullAt(0));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(25, newArray.getChar(1));
+                       newArray.setChar(0, (char) 5);
+                       assertEquals(5, newArray.getChar(0));
+                       newArray.setNullChar(0);
+                       assertTrue(newArray.isNullAt(0));
+               }
+
+               {
+                       // test string
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 8);
+                       writer.setNullAt(0);
+                       writer.writeString(1, fromString("jaja"));
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(fromString("jaja"), array.getString(1));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(fromString("jaja"), newArray.getString(1));
+               }
+
+               BinaryArray subArray = new BinaryArray();
+               BinaryArrayWriter subWriter = new BinaryArrayWriter(subArray, 
2, 8);
+               subWriter.setNullAt(0);
+               subWriter.writeString(1, fromString("hehehe"));
+               subWriter.complete();
+
+               {
+                       // test array
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 8);
+                       writer.setNullAt(0);
+                       writer.writeArray(1, subArray);
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(subArray, array.getArray(1));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(subArray, newArray.getArray(1));
+               }
+
+               {
+                       // test map
+                       BinaryArray array = new BinaryArray();
+                       BinaryArrayWriter writer = new BinaryArrayWriter(array, 
2, 8);
+                       writer.setNullAt(0);
+                       writer.writeMap(1, BinaryMap.valueOf(subArray, 
subArray));
+                       writer.complete();
+
+                       assertTrue(array.isNullAt(0));
+                       assertEquals(BinaryMap.valueOf(subArray, subArray), 
array.getMap(1));
+
+                       BinaryArray newArray = splitArray(array);
+                       assertTrue(newArray.isNullAt(0));
+                       assertEquals(BinaryMap.valueOf(subArray, subArray), 
newArray.getMap(1));
+               }
+       }
+
+       @Test
+       public void testMap() {
+               BinaryArray array1 = new BinaryArray();
+               BinaryArrayWriter writer1 = new BinaryArrayWriter(array1, 3, 4);
+               writer1.writeInt(0, 6);
+               writer1.writeInt(1, 5);
+               writer1.writeInt(2, 666);
+               writer1.complete();
+
+               BinaryArray array2 = new BinaryArray();
+               BinaryArrayWriter writer2 = new BinaryArrayWriter(array2, 3, 8);
+               writer2.writeString(0, fromString("6"));
+               writer2.writeString(1, fromString("5"));
+               writer2.writeString(2, fromString("666"));
+               writer2.complete();
+
+               BinaryMap binaryMap = BinaryMap.valueOf(array1, array2);
+
+               BinaryRow row = new BinaryRow(1);
+               BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+               rowWriter.writeMap(0, binaryMap);
+               rowWriter.complete();
+
+               BinaryMap map = row.getMap(0);
+               BinaryArray key = map.keyArray();
+               BinaryArray value = map.valueArray();
+
+               assertEquals(binaryMap, map);
+               assertEquals(array1, key);
+               assertEquals(array2, value);
+
+               assertEquals(key.getInt(1), 5);
+               assertEquals(value.getString(1), fromString("5"));
+       }
+
+       private static BinaryArray splitArray(BinaryArray array) {
+               BinaryArray ret = new BinaryArray();
+               MemorySegment[] segments = 
splitBytes(SegmentsUtil.copyToBytes(array.segments, 0, array.sizeInBytes), 0);
+               ret.pointTo(segments, 0, array.sizeInBytes);
+               return ret;
+       }
+
+       private static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) 
{
+               int newSize = (bytes.length + 1) / 2 + baseOffset;
+               MemorySegment[] ret = new MemorySegment[2];
+               ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+               ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+               ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+               ret[1].put(0, bytes, newSize - baseOffset, bytes.length - 
(newSize - baseOffset));
+               return ret;
+       }
+
+       @Test
+       public void testToArray() {
+               BinaryArray array = new BinaryArray();
+               BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 2);
+               writer.writeShort(0, (short) 5);
+               writer.writeShort(1, (short) 10);
+               writer.writeShort(2, (short) 15);
+               writer.complete();
+
+               short[] shorts = array.toShortArray();
+               assertEquals(5, shorts[0]);
+               assertEquals(10, shorts[1]);
+               assertEquals(15, shorts[2]);
+
+               MemorySegment[] segments = 
splitBytes(writer.segment.getArray(), 3);
+               array.pointTo(segments, 3, array.getSizeInBytes());
+               assertEquals(5, array.getShort(0));
+               assertEquals(10, array.getShort(1));
+               assertEquals(15, array.getShort(2));
+               short[] shorts2 = array.toShortArray();
+               assertEquals(5, shorts2[0]);
+               assertEquals(10, shorts2[1]);
+               assertEquals(15, shorts2[2]);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
new file mode 100644
index 0000000..a0e9892
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of {@link BinaryRow} and {@link BinaryRowWriter}.
+ */
+public class BinaryRowTest {
+
+       @Test
+       public void testBasic() {
+               // consider header 1 byte.
+               assertEquals(8, new BinaryRow(0).getFixedLengthPartSize());
+               assertEquals(16, new BinaryRow(1).getFixedLengthPartSize());
+               assertEquals(536, new BinaryRow(65).getFixedLengthPartSize());
+               assertEquals(1048, new BinaryRow(128).getFixedLengthPartSize());
+
+               MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[100]);
+               BinaryRow row = new BinaryRow(2);
+               row.pointTo(segment, 10, 48);
+               assertTrue(row.getSegments()[0] == segment);
+               row.setInt(0, 5);
+               row.setDouble(1, 5.8D);
+       }
+
+       @Test
+       public void testSetAndGet() {
+               MemorySegment segment = MemorySegmentFactory.wrap(new byte[80]);
+               BinaryRow row = new BinaryRow(9);
+               row.pointTo(segment, 0, 80);
+               row.setNullAt(0);
+               row.setInt(1, 11);
+               row.setLong(2, 22);
+               row.setDouble(3, 33);
+               row.setBoolean(4, true);
+               row.setShort(5, (short) 55);
+               row.setByte(6, (byte) 66);
+               row.setFloat(7, 77f);
+               row.setChar(8, 'a');
+
+               assertEquals(33d, (long) row.getDouble(3), 0);
+               assertEquals(11, row.getInt(1));
+               assertTrue(row.isNullAt(0));
+               assertEquals(55, row.getShort(5));
+               assertEquals(22, row.getLong(2));
+               assertEquals(true, row.getBoolean(4));
+               assertEquals((byte) 66, row.getByte(6));
+               assertEquals(77f, row.getFloat(7), 0);
+               assertEquals('a', row.getChar(8));
+       }
+
+       @Test
+       public void testWriter() {
+
+               int arity = 13;
+               BinaryRow row = new BinaryRow(arity);
+               BinaryRowWriter writer = new BinaryRowWriter(row, 20);
+
+               writer.writeString(0, fromString("1"));
+               writer.writeString(3, fromString("1234567"));
+               writer.writeString(5, fromString("12345678"));
+               writer.writeString(9, fromString("啦啦啦啦啦我是快乐的粉刷匠"));
+
+               writer.writeBoolean(1, true);
+               writer.writeByte(2, (byte) 99);
+               writer.writeChar(4, 'x');
+               writer.writeDouble(6, 87.1d);
+               writer.writeFloat(7, 26.1f);
+               writer.writeInt(8, 88);
+               writer.writeLong(10, 284);
+               writer.writeShort(11, (short) 292);
+               writer.setNullAt(12);
+
+               writer.complete();
+
+               assertTestWriterRow(row);
+               assertTestWriterRow(row.copy());
+
+               // test copy from var segments.
+               int subSize = row.getFixedLengthPartSize() + 10;
+               MemorySegment subMs1 = MemorySegmentFactory.wrap(new 
byte[subSize]);
+               MemorySegment subMs2 = MemorySegmentFactory.wrap(new 
byte[subSize]);
+               row.getSegments()[0].copyTo(0, subMs1, 0, subSize);
+               row.getSegments()[0].copyTo(subSize, subMs2, 0, 
row.getSizeInBytes() - subSize);
+
+               BinaryRow toCopy = new BinaryRow(arity);
+               toCopy.pointTo(new MemorySegment[]{subMs1, subMs2}, 0, 
row.getSizeInBytes());
+               assertEquals(row, toCopy);
+               assertTestWriterRow(toCopy);
+               assertTestWriterRow(toCopy.copy(new BinaryRow(arity)));
+       }
+
+       @Test
+       public void testwriteString() throws IOException {
+               {
+                       // litter byte[]
+                       BinaryRow row = new BinaryRow(1);
+                       BinaryRowWriter writer = new BinaryRowWriter(row);
+                       char[] chars = new char[2];
+                       chars[0] = 0xFFFF;
+                       chars[1] = 0;
+                       writer.writeString(0, fromString(new String(chars)));
+                       writer.complete();
+
+                       String str = row.getString(0).toString();
+                       assertEquals(chars[0], str.charAt(0));
+                       assertEquals(chars[1], str.charAt(1));
+               }
+
+               {
+                       // big byte[]
+                       String str = "啦啦啦啦啦我是快乐的粉刷匠";
+                       BinaryRow row = new BinaryRow(1);
+                       BinaryRowWriter writer = new BinaryRowWriter(row);
+                       writer.writeString(0, fromString(str));
+                       writer.complete();
+
+                       assertEquals(str, row.getString(0).toString());
+               }
+       }
+
+       private void assertTestWriterRow(BinaryRow row) {
+               assertEquals("1", row.getString(0).toString());
+               assertEquals(88, row.getInt(8));
+               assertEquals((short) 292, row.getShort(11));
+               assertEquals(284, row.getLong(10));
+               assertEquals((byte) 99, row.getByte(2));
+               assertEquals('x', row.getChar(4));
+               assertEquals(87.1d, row.getDouble(6), 0);
+               assertEquals(26.1f, row.getFloat(7), 0);
+               assertEquals(true, row.getBoolean(1));
+               assertEquals("1234567", row.getString(3).toString());
+               assertEquals("12345678", row.getString(5).toString());
+               assertEquals("啦啦啦啦啦我是快乐的粉刷匠", row.getString(9).toString());
+               assertTrue(row.isNullAt(12));
+       }
+
+       @Test
+       public void testReuseWriter() {
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+               writer.writeString(0, fromString("01234567"));
+               writer.writeString(1, fromString("012345678"));
+               writer.complete();
+               assertEquals("01234567", row.getString(0).toString());
+               assertEquals("012345678", row.getString(1).toString());
+
+               writer.reset();
+               writer.writeString(0, fromString("1"));
+               writer.writeString(1, fromString("0123456789"));
+               writer.complete();
+               assertEquals("1", row.getString(0).toString());
+               assertEquals("0123456789", row.getString(1).toString());
+       }
+
+       @Test
+       public void anyNullTest() throws IOException {
+               {
+                       BinaryRow row = new BinaryRow(3);
+                       BinaryRowWriter writer = new BinaryRowWriter(row);
+                       assertFalse(row.anyNull());
+
+                       // test header should not compute by anyNull
+                       row.setHeader((byte) 1);
+                       assertFalse(row.anyNull());
+
+                       writer.setNullAt(2);
+                       assertTrue(row.anyNull());
+
+                       writer.setNullAt(0);
+                       assertTrue(row.anyNull(new int[]{0, 1, 2}));
+                       assertFalse(row.anyNull(new int[]{1}));
+
+                       writer.setNullAt(1);
+                       assertTrue(row.anyNull());
+               }
+
+               {
+                       BinaryRow row = new BinaryRow(80);
+                       BinaryRowWriter writer = new BinaryRowWriter(row);
+                       assertFalse(row.anyNull());
+
+                       writer.setNullAt(3);
+                       assertTrue(row.anyNull());
+
+                       writer = new BinaryRowWriter(row);
+                       writer.setNullAt(65);
+                       assertTrue(row.anyNull());
+               }
+       }
+
+       @Test
+       public void testHeaderSize() throws IOException {
+               assertEquals(8, BinaryRow.calculateBitSetWidthInBytes(56));
+               assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(57));
+               assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(120));
+               assertEquals(24, BinaryRow.calculateBitSetWidthInBytes(121));
+       }
+
+       @Test
+       public void testHeader() throws IOException {
+               BinaryRow row = new BinaryRow(2);
+               BinaryRowWriter writer = new BinaryRowWriter(row);
+
+               writer.writeInt(0, 10);
+               writer.setNullAt(1);
+               writer.writeHeader((byte) 29);
+               writer.complete();
+
+               BinaryRow newRow = row.copy();
+               assertEquals(row, newRow);
+               assertEquals((byte) 29, newRow.getHeader());
+
+               newRow.setHeader((byte) 19);
+               assertEquals((byte) 19, newRow.getHeader());
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
new file mode 100644
index 0000000..c1808bc
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryRowTest;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link SegmentsUtil}, most is covered by {@link BinaryRowTest},
+ * this just test some boundary scenarios testing.
+ */
+public class SegmentsUtilTest {
+
+       @Test
+       public void testCopy() {
+               // test copy the content of the latter Seg
+               MemorySegment[] segments = new MemorySegment[2];
+               segments[0] = MemorySegmentFactory.wrap(new byte[]{0, 2, 5});
+               segments[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15});
+
+               byte[] bytes = SegmentsUtil.copyToBytes(segments, 4, 2);
+               Assert.assertArrayEquals(new byte[] {12, 15}, bytes);
+       }
+
+       @Test
+       public void testEquals() {
+               // test copy the content of the latter Seg
+               MemorySegment[] segments1 = new MemorySegment[3];
+               segments1[0] = MemorySegmentFactory.wrap(new byte[]{0, 2, 5});
+               segments1[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15});
+               segments1[2] = MemorySegmentFactory.wrap(new byte[]{1, 1, 1});
+
+               MemorySegment[] segments2 = new MemorySegment[2];
+               segments2[0] = MemorySegmentFactory.wrap(new byte[]{6, 0, 2, 
5});
+               segments2[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15, 
18});
+
+               Assert.assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 
0, segments2, 0, 0));
+               Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 
1, 3));
+               Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 
1, 6));
+               Assert.assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 
1, 7));
+       }
+
+}

Reply via email to