This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 76f979b [FLINK-11701][table-runtime-blink] Introduce an abstract set
of data formats
76f979b is described below
commit 76f979b7d35125b161e5cd71a948b969aa712ef8
Author: JingsongLi <[email protected]>
AuthorDate: Fri Mar 1 11:14:38 2019 +0800
[FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
This closes #7816
---
.../table/dataformat/AbstractBinaryWriter.java | 178 +++++
.../org/apache/flink/table/dataformat/BaseRow.java | 48 ++
.../apache/flink/table/dataformat/BinaryArray.java | 410 +++++++++++
.../flink/table/dataformat/BinaryArrayWriter.java | 181 +++++
.../flink/table/dataformat/BinaryFormat.java | 64 ++
.../apache/flink/table/dataformat/BinaryMap.java | 96 +++
.../apache/flink/table/dataformat/BinaryRow.java | 299 ++++++++
.../flink/table/dataformat/BinaryRowWriter.java | 131 ++++
.../flink/table/dataformat/BinaryString.java | 93 +++
.../flink/table/dataformat/BinaryWriter.java | 64 ++
.../apache/flink/table/dataformat/GenericRow.java | 130 ++++
.../flink/table/dataformat/ObjectArrayRow.java | 104 +++
.../flink/table/dataformat/TypeGetterSetters.java | 138 ++++
.../apache/flink/table/util/BinaryStringUtil.java | 57 ++
.../org/apache/flink/table/util/SegmentsUtil.java | 811 +++++++++++++++++++++
.../flink/table/dataformat/BinaryArrayTest.java | 429 +++++++++++
.../flink/table/dataformat/BinaryRowTest.java | 245 +++++++
.../apache/flink/table/util/SegmentsUtilTest.java | 62 ++
18 files changed, 3540 insertions(+)
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
new file mode 100644
index 0000000..3deca69
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.BinaryStringUtil;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.util.Arrays;
+
+/**
+ * Use the special format to write data to a {@link MemorySegment} (its
capacity grows
+ * automatically).
+ *
+ * <p>If write a format binary:
+ * 1. New a writer.
+ * 2. Write each field by writeXX or setNullAt. (Variable length fields can
not be written
+ * repeatedly.)
+ * 3. Invoke {@link #complete()}.
+ *
+ * <p>If want to reuse this writer, please invoke {@link #reset()} first.
+ */
+public abstract class AbstractBinaryWriter implements BinaryWriter {
+
+ protected MemorySegment segment;
+
+ protected int cursor;
+
+ /**
+ * Set offset and size to fix len part.
+ */
+ protected abstract void setOffsetAndSize(int pos, int offset, long
size);
+
+ /**
+ * Get field offset.
+ */
+ protected abstract int getFieldOffset(int pos);
+
+ /**
+ * After grow, need point to new memory.
+ */
+ protected abstract void afterGrow();
+
+ /**
+ * See {@link BinaryString#readBinaryStringFieldFromSegments}.
+ */
+ @Override
+ public void writeString(int pos, BinaryString input) {
+ int len = input.getSizeInBytes();
+ if (len <= 7) {
+ byte[] bytes = BinaryStringUtil.allocateReuseBytes(len);
+ SegmentsUtil.copyToBytes(input.getSegments(),
input.getOffset(), bytes, 0, len);
+ writeBytesToFixLenPart(segment, getFieldOffset(pos),
bytes, len);
+ } else {
+ writeSegmentsToVarLenPart(pos, input.getSegments(),
input.getOffset(), len);
+ }
+ }
+
+ @Override
+ public void writeArray(int pos, BinaryArray input) {
+ writeSegmentsToVarLenPart(pos, input.getSegments(),
input.getOffset(), input.getSizeInBytes());
+ }
+
+ @Override
+ public void writeMap(int pos, BinaryMap input) {
+ writeSegmentsToVarLenPart(pos, input.getSegments(),
input.getOffset(), input.getSizeInBytes());
+ }
+
+ private void zeroOutPaddingBytes(int numBytes) {
+ if ((numBytes & 0x07) > 0) {
+ segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
+ }
+ }
+
+ private void ensureCapacity(int neededSize) {
+ final int length = cursor + neededSize;
+ if (segment.size() < length) {
+ grow(length);
+ }
+ }
+
+ private void writeSegmentsToVarLenPart(int pos, MemorySegment[]
segments, int offset, int size) {
+ final int roundedSize = roundNumberOfBytesToNearestWord(size);
+
+ // grow the global buffer before writing data.
+ ensureCapacity(roundedSize);
+
+ zeroOutPaddingBytes(size);
+
+ if (segments.length == 1) {
+ segments[0].copyTo(offset, segment, cursor, size);
+ } else {
+ writeMultiSegmentsToVarLenPart(segments, offset, size);
+ }
+
+ setOffsetAndSize(pos, cursor, size);
+
+ // move the cursor forward.
+ cursor += roundedSize;
+ }
+
+ private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments,
int offset, int size) {
+ // Write the bytes to the variable length portion.
+ int needCopy = size;
+ int fromOffset = offset;
+ int toOffset = cursor;
+ for (MemorySegment sourceSegment : segments) {
+ int remain = sourceSegment.size() - fromOffset;
+ if (remain > 0) {
+ int copySize = remain > needCopy ? needCopy :
remain;
+ sourceSegment.copyTo(fromOffset, segment,
toOffset, copySize);
+ needCopy -= copySize;
+ toOffset += copySize;
+ fromOffset = 0;
+ } else {
+ fromOffset -= sourceSegment.size();
+ }
+ }
+ }
+
+ /**
+ * Increases the capacity to ensure that it can hold at least the
+ * minimum capacity argument.
+ */
+ private void grow(int minCapacity) {
+ int oldCapacity = segment.size();
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+ if (newCapacity - minCapacity < 0) {
+ newCapacity = minCapacity;
+ }
+ segment =
MemorySegmentFactory.wrap(Arrays.copyOf(segment.getArray(), newCapacity));
+ afterGrow();
+ }
+
+ protected static int roundNumberOfBytesToNearestWord(int numBytes) {
+ int remainder = numBytes & 0x07;
+ if (remainder == 0) {
+ return numBytes;
+ } else {
+ return numBytes + (8 - remainder);
+ }
+ }
+
+ private static void writeBytesToFixLenPart(
+ MemorySegment segment, int fieldOffset, byte[] bytes,
int len) {
+ long firstByte = len | 0x80; // first bit is 1, other bits is
len
+ long sevenBytes = 0L; // real data
+ if (BinaryRow.LITTLE_ENDIAN) {
+ for (int i = 0; i < len; i++) {
+ sevenBytes |= ((0x00000000000000FFL & bytes[i])
<< (i * 8L));
+ }
+ } else {
+ for (int i = 0; i < len; i++) {
+ sevenBytes |= ((0x00000000000000FFL & bytes[i])
<< ((6 - i) * 8L));
+ }
+ }
+
+ final long offsetAndSize = (firstByte << 56) | sevenBytes;
+
+ segment.putLong(fieldOffset, offsetAndSize);
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseRow.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseRow.java
new file mode 100644
index 0000000..8ee4836
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * An interface for row used internally in Flink Table/SQL.
+ *
+ * <p>There are different implementations depending on the scenario:
+ * After serialization, it becomes the BinaryRow format.
+ * Convenient updates use the GenericRow format.
+ *
+ * <p>{@code BaseRow}s are influenced by Apache Spark InternalRows.
+ */
+public interface BaseRow extends TypeGetterSetters {
+
+ /**
+ * Get the number of fields in the BaseRow.
+ *
+ * @return The number of fields in the BaseRow.
+ */
+ int getArity();
+
+ /**
+ * The header represents the type of this Row. Now just used in
streaming.
+ * Now there are two message: ACCUMULATE_MSG and RETRACT_MSG.
+ */
+ byte getHeader();
+
+ /**
+ * Set the byte header.
+ */
+ void setHeader(byte header);
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
new file mode 100644
index 0000000..e708710
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+
+/**
+ * For fields that hold fixed-length primitive types, such as long, double, or
int, we store the
+ * value directly in the field, just like the original java array.
+ *
+ * <p>[numElements(int)] + [null bits(4-byte word boundaries)] + [values or
offset&length] + [variable length part].
+ *
+ * <p>{@code BinaryArray} are influenced by Apache Spark UnsafeArrayData.
+ */
+public class BinaryArray extends BinaryFormat implements TypeGetterSetters {
+
+ /**
+ * Offset for Arrays.
+ */
+ private static final int BYTE_ARRAY_BASE_OFFSET =
UNSAFE.arrayBaseOffset(byte[].class);
+ private static final int BOOLEAN_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(boolean[].class);
+ private static final int SHORT_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(short[].class);
+ private static final int INT_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(int[].class);
+ private static final int LONG_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(long[].class);
+ private static final int FLOAT_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(float[].class);
+ private static final int DOUBLE_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(double[].class);
+
+ public static int calculateHeaderInBytes(int numFields) {
+ return 4 + ((numFields + 31) / 32) * 4;
+ }
+
+ // The number of elements in this array
+ private int numElements;
+
+ /** The position to start storing array elements. */
+ private int elementOffset;
+
+ public BinaryArray() {}
+
+ private void assertIndexIsValid(int ordinal) {
+ assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
+ assert ordinal < numElements : "ordinal (" + ordinal + ")
should < " + numElements;
+ }
+
+ private int getElementOffset(int ordinal, int elementSize) {
+ return elementOffset + ordinal * elementSize;
+ }
+
+ public int numElements() {
+ return numElements;
+ }
+
+ public void pointTo(MemorySegment segment, int offset, int sizeInBytes)
{
+ pointTo(new MemorySegment[]{segment}, offset, sizeInBytes);
+ }
+
+ public void pointTo(MemorySegment[] segments, int offset, int
sizeInBytes) {
+ // Read the number of elements from the first 4 bytes.
+ final int numElements = SegmentsUtil.getInt(segments, offset);
+ assert numElements >= 0 : "numElements (" + numElements + ")
should >= 0";
+
+ this.numElements = numElements;
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ this.elementOffset = offset +
calculateHeaderInBytes(this.numElements);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.bitGet(segments, offset + 4, pos);
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ }
+
+ public void setNotNullAt(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitUnSet(segments, offset + 4, pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getLong(segments, getElementOffset(pos, 8));
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setLong(segments, getElementOffset(pos, 8), value);
+ }
+
+ public void setNullLong(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setLong(segments, getElementOffset(pos, 8), 0L);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getInt(segments, getElementOffset(pos, 4));
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setInt(segments, getElementOffset(pos, 4), value);
+ }
+
+ public void setNullInt(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setInt(segments, getElementOffset(pos, 4), 0);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getElementOffset(pos, 8);
+ final long offsetAndSize = SegmentsUtil.getLong(segments,
fieldOffset);
+ return BinaryString.readBinaryStringFieldFromSegments(
+ segments, offset, fieldOffset, offsetAndSize);
+ }
+
+ @Override
+ public BinaryArray getArray(int pos) {
+ assertIndexIsValid(pos);
+ return BinaryArray.readBinaryArrayFieldFromSegments(segments,
offset, getLong(pos));
+ }
+
+ @Override
+ public BinaryMap getMap(int pos) {
+ assertIndexIsValid(pos);
+ return BinaryMap.readBinaryMapFieldFromSegments(segments,
offset, getLong(pos));
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getBoolean(segments, getElementOffset(pos,
1));
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setBoolean(segments, getElementOffset(pos, 1),
value);
+ }
+
+ public void setNullBoolean(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setBoolean(segments, getElementOffset(pos, 1),
false);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getByte(segments, getElementOffset(pos, 1));
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setByte(segments, getElementOffset(pos, 1), value);
+ }
+
+ public void setNullByte(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setByte(segments, getElementOffset(pos, 1), (byte)
0);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getShort(segments, getElementOffset(pos,
2));
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setShort(segments, getElementOffset(pos, 2),
value);
+ }
+
+ public void setNullShort(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setShort(segments, getElementOffset(pos, 2),
(short) 0);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getFloat(segments, getElementOffset(pos,
4));
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setFloat(segments, getElementOffset(pos, 4),
value);
+ }
+
+ public void setNullFloat(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setFloat(segments, getElementOffset(pos, 4), 0F);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getDouble(segments, getElementOffset(pos,
8));
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setDouble(segments, getElementOffset(pos, 8),
value);
+ }
+
+ public void setNullDouble(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setDouble(segments, getElementOffset(pos, 8), 0.0);
+ }
+
+ @Override
+ public char getChar(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.getChar(segments, getElementOffset(pos, 2));
+ }
+
+ @Override
+ public void setChar(int pos, char value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ SegmentsUtil.setChar(segments, getElementOffset(pos, 2), value);
+ }
+
+ public void setNullChar(int pos) {
+ assertIndexIsValid(pos);
+ SegmentsUtil.bitSet(segments, offset + 4, pos);
+ SegmentsUtil.setChar(segments, getElementOffset(pos, 2), '\0');
+ }
+
+ public boolean anyNull() {
+ for (int i = offset + 4; i < elementOffset; i += 4) {
+ if (SegmentsUtil.getInt(segments, i) != 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void checkNoNull() {
+ if (anyNull()) {
+ throw new RuntimeException("Array can not have null
value!");
+ }
+ }
+
+ public boolean[] toBooleanArray() {
+ checkNoNull();
+ boolean[] values = new boolean[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
BOOLEAN_ARRAY_OFFSET, numElements);
+ return values;
+ }
+
+ public byte[] toByteArray() {
+ checkNoNull();
+ byte[] values = new byte[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
BYTE_ARRAY_BASE_OFFSET, numElements);
+ return values;
+ }
+
+ public short[] toShortArray() {
+ checkNoNull();
+ short[] values = new short[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
SHORT_ARRAY_OFFSET, numElements * 2);
+ return values;
+ }
+
+ public int[] toIntArray() {
+ checkNoNull();
+ int[] values = new int[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
INT_ARRAY_OFFSET, numElements * 4);
+ return values;
+ }
+
+ public long[] toLongArray() {
+ checkNoNull();
+ long[] values = new long[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
LONG_ARRAY_OFFSET, numElements * 8);
+ return values;
+ }
+
+ public float[] toFloatArray() {
+ checkNoNull();
+ float[] values = new float[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
FLOAT_ARRAY_OFFSET, numElements * 4);
+ return values;
+ }
+
+ public double[] toDoubleArray() {
+ checkNoNull();
+ double[] values = new double[numElements];
+ SegmentsUtil.copyToUnsafe(
+ segments, elementOffset, values,
DOUBLE_ARRAY_OFFSET, numElements * 8);
+ return values;
+ }
+
+ private static BinaryArray fromPrimitiveArray(
+ Object arr, int offset, int length, int elementSize) {
+ final long headerInBytes = calculateHeaderInBytes(length);
+ final long valueRegionInBytes = elementSize * length;
+
+ // must align by 8 bytes
+ long totalSizeInLongs = (headerInBytes + valueRegionInBytes +
7) / 8;
+ if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
+ throw new UnsupportedOperationException("Cannot convert
this array to unsafe format as " +
+ "it's too big.");
+ }
+ long totalSize = totalSizeInLongs * 8;
+
+ final byte[] data = new byte[(int) totalSize];
+
+ UNSAFE.putInt(data, (long) BYTE_ARRAY_BASE_OFFSET, length);
+ UNSAFE.copyMemory(
+ arr, offset, data, BYTE_ARRAY_BASE_OFFSET +
headerInBytes, valueRegionInBytes);
+
+ BinaryArray result = new BinaryArray();
+ result.pointTo(MemorySegmentFactory.wrap(data), 0, (int)
totalSize);
+ return result;
+ }
+
+ public static BinaryArray fromPrimitiveArray(boolean[] arr) {
+ return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET,
arr.length, 1);
+ }
+
+ public static BinaryArray fromPrimitiveArray(byte[] arr) {
+ return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET,
arr.length, 1);
+ }
+
+ public static BinaryArray fromPrimitiveArray(short[] arr) {
+ return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length,
2);
+ }
+
+ public static BinaryArray fromPrimitiveArray(int[] arr) {
+ return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
+ }
+
+ public static BinaryArray fromPrimitiveArray(long[] arr) {
+ return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length,
8);
+ }
+
+ public static BinaryArray fromPrimitiveArray(float[] arr) {
+ return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length,
4);
+ }
+
+ public static BinaryArray fromPrimitiveArray(double[] arr) {
+ return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length,
8);
+ }
+
+ static BinaryArray readBinaryArrayFieldFromSegments(
+ MemorySegment[] segments, int baseOffset, long
offsetAndSize) {
+ final int size = ((int) offsetAndSize);
+ int offset = (int) (offsetAndSize >> 32);
+ BinaryArray array = new BinaryArray();
+ array.pointTo(segments, offset + baseOffset, size);
+ return array;
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
new file mode 100644
index 0000000..11bcc46
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * Writer for binary array. See {@link BinaryArray}.
+ */
+public class BinaryArrayWriter extends AbstractBinaryWriter {
+
+ private final int nullBitsSizeInBytes;
+ private final BinaryArray array;
+ private final int numElements;
+ private int fixedSize;
+
+ public BinaryArrayWriter(BinaryArray array, int numElements, int
elementSize) {
+ this.nullBitsSizeInBytes =
BinaryArray.calculateHeaderInBytes(numElements);
+ this.fixedSize = roundNumberOfBytesToNearestWord(
+ nullBitsSizeInBytes + elementSize *
numElements);
+ this.cursor = fixedSize;
+ this.numElements = numElements;
+
+ this.segment = MemorySegmentFactory.wrap(new byte[fixedSize]);
+ this.segment.putInt(0, numElements);
+ this.array = array;
+ }
+
+ /**
+ * First, reset.
+ */
+ @Override
+ public void reset() {
+ this.cursor = fixedSize;
+ for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+ segment.putLong(i, 0L);
+ }
+ this.segment.putInt(0, numElements);
+ }
+
+ private void setNullBit(int ordinal) {
+ SegmentsUtil.bitSet(segment, 4, ordinal);
+ }
+
+ public void setNullBoolean(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putBoolean(getElementOffset(ordinal, 1), false);
+ }
+
+ public void setNullByte(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.put(getElementOffset(ordinal, 1), (byte) 0);
+ }
+
+ public void setNullShort(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putShort(getElementOffset(ordinal, 2), (short) 0);
+ }
+
+ public void setNullInt(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putInt(getElementOffset(ordinal, 4), 0);
+ }
+
+ public void setNullLong(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putLong(getElementOffset(ordinal, 8), (long) 0);
+ }
+
+ public void setNullFloat(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putFloat(getElementOffset(ordinal, 4), (float) 0);
+ }
+
+ public void setNullDouble(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putDouble(getElementOffset(ordinal, 8), (double) 0);
+ }
+
+ @Override
+ public void setNullAt(int ordinal) {
+ setNullLong(ordinal);
+ }
+
+ private int getElementOffset(int pos, int elementSize) {
+ return nullBitsSizeInBytes + elementSize * pos;
+ }
+
+ @Override
+ public int getFieldOffset(int pos) {
+ return getElementOffset(pos, 8);
+ }
+
+ @Override
+ public void setOffsetAndSize(int pos, int offset, long size) {
+ final long offsetAndSize = ((long) offset << 32) | size;
+ segment.putLong(getElementOffset(pos, 8), offsetAndSize);
+ }
+
+ @Override
+ public void writeBoolean(int pos, boolean value) {
+ segment.putBoolean(getElementOffset(pos, 1), value);
+ }
+
+ @Override
+ public void writeByte(int pos, byte value) {
+ segment.put(getElementOffset(pos, 1), value);
+ }
+
+ @Override
+ public void writeShort(int pos, short value) {
+ segment.putShort(getElementOffset(pos, 2), value);
+ }
+
+ @Override
+ public void writeInt(int pos, int value) {
+ segment.putInt(getElementOffset(pos, 4), value);
+ }
+
+ @Override
+ public void writeLong(int pos, long value) {
+ segment.putLong(getElementOffset(pos, 8), value);
+ }
+
+ @Override
+ public void writeFloat(int pos, float value) {
+ if (Float.isNaN(value)) {
+ value = Float.NaN;
+ }
+ segment.putFloat(getElementOffset(pos, 4), value);
+ }
+
+ @Override
+ public void writeDouble(int pos, double value) {
+ if (Double.isNaN(value)) {
+ value = Double.NaN;
+ }
+ segment.putDouble(getElementOffset(pos, 8), value);
+ }
+
+ @Override
+ public void writeChar(int pos, char value) {
+ segment.putChar(getElementOffset(pos, 2), value);
+ }
+
+ @Override
+ public void afterGrow() {
+ array.pointTo(segment, 0, segment.size());
+ }
+
+ /**
+ * Finally, complete write to set real size to row.
+ */
+ @Override
+ public void complete() {
+ array.pointTo(segment, 0, cursor);
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
new file mode 100644
index 0000000..b3ac09d
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * Binary format that in {@link MemorySegment}s.
+ */
+public abstract class BinaryFormat<T> {
+
+ protected MemorySegment[] segments;
+ protected int offset;
+ protected int sizeInBytes;
+
+ public BinaryFormat() {}
+
+ public BinaryFormat(MemorySegment[] segments, int offset, int
sizeInBytes) {
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public MemorySegment[] getSegments() {
+ return segments;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getSizeInBytes() {
+ return sizeInBytes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o || o != null &&
+ getClass() == o.getClass() &&
+ binaryEquals((BinaryFormat) o);
+ }
+
+ public boolean binaryEquals(BinaryFormat that) {
+ return sizeInBytes == that.sizeInBytes &&
+ SegmentsUtil.equals(segments, offset,
that.segments, that.offset, sizeInBytes);
+ }
+
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
new file mode 100644
index 0000000..fefdc27
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ *
+ * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
+ */
+public class BinaryMap extends BinaryFormat {
+
+ private final BinaryArray keys;
+ private final BinaryArray values;
+
+ public BinaryMap() {
+ keys = new BinaryArray();
+ values = new BinaryArray();
+ }
+
+ public int numElements() {
+ return keys.numElements();
+ }
+
+ public void pointTo(MemorySegment segment, int baseOffset, int
sizeInBytes) {
+ pointTo(new MemorySegment[]{segment}, baseOffset, sizeInBytes);
+ }
+
+ public void pointTo(MemorySegment[] segments, int offset, int
sizeInBytes) {
+ // Read the numBytes of key array from the first 4 bytes.
+ final int keyArrayBytes = SegmentsUtil.getInt(segments, offset);
+ assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes +
") should >= 0";
+ final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
+ assert valueArrayBytes >= 0 : "valueArraySize (" +
valueArrayBytes + ") should >= 0";
+
+ keys.pointTo(segments, offset + 4, keyArrayBytes);
+ values.pointTo(segments, offset + 4 + keyArrayBytes,
valueArrayBytes);
+
+ assert keys.numElements() == values.numElements();
+
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public BinaryArray keyArray() {
+ return keys;
+ }
+
+ public BinaryArray valueArray() {
+ return values;
+ }
+
+ public static BinaryMap valueOf(BinaryArray key, BinaryArray value) {
+ checkArgument(key.getSegments().length == 1 &&
value.getSegments().length == 1);
+ byte[] bytes = new byte[4 + key.getSizeInBytes() +
value.getSizeInBytes()];
+ MemorySegment segment = MemorySegmentFactory.wrap(bytes);
+ segment.putInt(0, key.getSizeInBytes());
+ key.getSegments()[0].copyTo(key.getOffset(), segment, 4,
key.getSizeInBytes());
+ value.getSegments()[0].copyTo(
+ value.getOffset(), segment, 4 +
key.getSizeInBytes(), value.getSizeInBytes());
+ BinaryMap map = new BinaryMap();
+ map.pointTo(segment, 0, bytes.length);
+ return map;
+ }
+
+ public static BinaryMap readBinaryMapFieldFromSegments(
+ MemorySegment[] segments, int baseOffset, long
offsetAndSize) {
+ final int size = ((int) offsetAndSize);
+ int offset = (int) (offsetAndSize >> 32);
+ BinaryMap map = new BinaryMap();
+ map.pointTo(segments, offset + baseOffset, size);
+ return map;
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
new file mode 100644
index 0000000..472f7b22
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A special row which is backed by {@link MemorySegment} instead of Object.
It can significantly
+ * reduce the serialization/deserialization of Java objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains 1 byte header and null bit set and field
values. Null bit set is
+ * used for null tracking and is aligned to 8-byte word boundaries. `Field
values` holds
+ * fixed-length primitive types and variable-length values which can be stored
in 8 bytes inside.
+ * If it do not fit the variable-length field, then store the length and
offset of variable-length
+ * part.
+ *
+ * <p>Fixed-length part will certainly fall into a MemorySegment, which will
speed up the read
+ * and write of field. During the write phase, if the target memory segment
has less space than
+ * fixed length part size, we will skip the space. So the number of fields in
a single Row cannot
+ * exceed the capacity of a single MemorySegment, if there are too many
fields, we suggest that
+ * user set a bigger pageSize of MemorySegment.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ *
+ * <p>{@code BinaryRow} are influenced by Apache Spark UnsafeRow in project
tungsten.
+ * The difference is that BinaryRow is placed on a discontinuous memory, and
the variable length
+ * type can also be placed on a fixed length area (If it's short enough).
+ */
+public final class BinaryRow extends BinaryFormat<Object> implements BaseRow {
+
+ public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() ==
ByteOrder.LITTLE_ENDIAN);
+ private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? 0xFFF0 :
0x0FFF;
+ private static final int HEADER_SIZE_IN_BITS = 8;
+
+ public static int calculateBitSetWidthInBytes(int arity) {
+ return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
+ }
+
+ private final int arity;
+ private final int nullBitsSizeInBytes;
+
+ public BinaryRow(int arity) {
+ checkArgument(arity >= 0);
+ this.arity = arity;
+ this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+ }
+
+ private int getFieldOffset(int pos) {
+ return offset + nullBitsSizeInBytes + pos * 8;
+ }
+
+ private void assertIndexIsValid(int index) {
+ assert index >= 0 : "index (" + index + ") should >= 0";
+ assert index < arity : "index (" + index + ") should < " +
arity;
+ }
+
+ public int getFixedLengthPartSize() {
+ return nullBitsSizeInBytes + 8 * arity;
+ }
+
+ @Override
+ public int getArity() {
+ return arity;
+ }
+
+ @Override
+ public byte getHeader() {
+ // first nullBitsSizeInBytes byte is header.
+ return segments[0].get(offset);
+ }
+
+ @Override
+ public void setHeader(byte header) {
+ segments[0].put(offset, header);
+ }
+
+ public void pointTo(MemorySegment segment, int offset, int sizeInBytes)
{
+ this.segments = new MemorySegment[] {segment};
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public void pointTo(MemorySegment[] segments, int offset, int
sizeInBytes) {
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public void setTotalSize(int sizeInBytes) {
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ assertIndexIsValid(pos);
+ return SegmentsUtil.bitGet(segments[0], offset, pos +
HEADER_SIZE_IN_BITS);
+ }
+
+ private void setNotNullAt(int i) {
+ assertIndexIsValid(i);
+ SegmentsUtil.bitUnSet(segments[0], offset, i +
HEADER_SIZE_IN_BITS);
+ }
+
+ @Override
+ public void setNullAt(int i) {
+ assertIndexIsValid(i);
+ SegmentsUtil.bitSet(segments[0], offset, i +
HEADER_SIZE_IN_BITS);
+ // We must set the fixed length part zero.
+ // 1.Only int/long/boolean...(Fix length type) will invoke this
setNullAt.
+ // 2.Set to zero in order to equals and hash operation bytes
calculation.
+ segments[0].putLong(getFieldOffset(i), 0);
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putInt(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putLong(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putDouble(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setChar(int pos, char value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putChar(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putBoolean(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putShort(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].put(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putFloat(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getBoolean(getFieldOffset(pos));
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].get(getFieldOffset(pos));
+ }
+
+ @Override
+ public short getShort(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getShort(getFieldOffset(pos));
+ }
+
+ @Override
+ public int getInt(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getInt(getFieldOffset(pos));
+ }
+
+ @Override
+ public long getLong(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getLong(getFieldOffset(pos));
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getFloat(getFieldOffset(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getDouble(getFieldOffset(pos));
+ }
+
+ @Override
+ public char getChar(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getChar(getFieldOffset(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ final long offsetAndLen = segments[0].getLong(fieldOffset);
+ return BinaryString.readBinaryStringFieldFromSegments(segments,
offset, fieldOffset, offsetAndLen);
+ }
+
+ @Override
+ public BinaryArray getArray(int pos) {
+ assertIndexIsValid(pos);
+ return BinaryArray.readBinaryArrayFieldFromSegments(segments,
offset, getLong(pos));
+ }
+
+ @Override
+ public BinaryMap getMap(int pos) {
+ assertIndexIsValid(pos);
+ return BinaryMap.readBinaryMapFieldFromSegments(segments,
offset, getLong(pos));
+ }
+
+ /**
+ * The bit is 1 when the field is null. Default is 0.
+ */
+ public boolean anyNull() {
+ // Skip the header.
+ if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) {
+ return true;
+ }
+ for (int i = 8; i < nullBitsSizeInBytes; i += 8) {
+ if (segments[0].getLong(i) != 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean anyNull(int[] fields) {
+ for (int field : fields) {
+ if (isNullAt(field)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public BinaryRow copy() {
+ return copy(new BinaryRow(arity));
+ }
+
+ public BinaryRow copy(BaseRow reuse) {
+ return copyInternal((BinaryRow) reuse);
+ }
+
+ private BinaryRow copyInternal(BinaryRow reuse) {
+ byte[] bytes = SegmentsUtil.copyToBytes(segments, offset,
sizeInBytes);
+ reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+ return reuse;
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
new file mode 100644
index 0000000..906d073
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * Writer for {@link BinaryRow}.
+ */
+public class BinaryRowWriter extends AbstractBinaryWriter {
+
+ private final int nullBitsSizeInBytes;
+ private final BinaryRow row;
+ private final int fixedSize;
+
+ public BinaryRowWriter(BinaryRow row) {
+ this(row, 0);
+ }
+
+ public BinaryRowWriter(BinaryRow row, int initialSize) {
+ this.nullBitsSizeInBytes =
BinaryRow.calculateBitSetWidthInBytes(row.getArity());
+ this.fixedSize = row.getFixedLengthPartSize();
+ this.cursor = fixedSize;
+
+ this.segment = MemorySegmentFactory.wrap(new byte[fixedSize +
initialSize]);
+ this.row = row;
+ this.row.pointTo(segment, 0, segment.size());
+ }
+
+ /**
+ * First, reset.
+ */
+ @Override
+ public void reset() {
+ this.cursor = fixedSize;
+ for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+ segment.putLong(i, 0L);
+ }
+ }
+
+ /**
+ * Default not null.
+ */
+ @Override
+ public void setNullAt(int pos) {
+ // need add header 8 bit.
+ SegmentsUtil.bitSet(segment, 0, pos + 8);
+ segment.putLong(getFieldOffset(pos), 0L);
+ }
+
+ public void writeHeader(byte header) {
+ segment.put(0, header);
+ }
+
+ @Override
+ public void writeBoolean(int pos, boolean value) {
+ segment.putBoolean(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeByte(int pos, byte value) {
+ segment.put(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeShort(int pos, short value) {
+ segment.putShort(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeInt(int pos, int value) {
+ segment.putInt(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeLong(int pos, long value) {
+ segment.putLong(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeFloat(int pos, float value) {
+ segment.putFloat(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeDouble(int pos, double value) {
+ segment.putDouble(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void writeChar(int pos, char value) {
+ segment.putChar(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void complete() {
+ row.setTotalSize(cursor);
+ }
+
+ @Override
+ public int getFieldOffset(int pos) {
+ return nullBitsSizeInBytes + 8 * pos;
+ }
+
+ @Override
+ public void setOffsetAndSize(int pos, int offset, long size) {
+ final long offsetAndSize = ((long) offset << 32) | size;
+ segment.putLong(getFieldOffset(pos), offsetAndSize);
+ }
+
+ @Override
+ public void afterGrow() {
+ row.pointTo(segment, 0, segment.size());
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
new file mode 100644
index 0000000..4b388f6
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.BinaryStringUtil;
+import org.apache.flink.table.util.SegmentsUtil;
+
+/**
+ * A utf8 string which is backed by {@link MemorySegment} instead of String.
Its data may span
+ * multiple {@link MemorySegment}s.
+ *
+ * <p>Used for internal table-level implementation. The built-in operator will
use it for comparison,
+ * search, and so on.
+ *
+ * <p>{@code BinaryString} are influenced by Apache Spark UTF8String.
+ */
+public class BinaryString extends BinaryFormat<String> {
+
+ private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE;
+ private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
+
+ public BinaryString(MemorySegment[] segments, int offset, int
sizeInBytes) {
+ super(segments, offset, sizeInBytes);
+ }
+
+ public static BinaryString fromString(String str) {
+ if (str == null) {
+ return null;
+ } else {
+ byte[] bytes = str.getBytes();
+ return new BinaryString(new MemorySegment[]
{MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+ }
+ }
+
+ @Override
+ public String toString() {
+ byte[] bytes = BinaryStringUtil.allocateReuseBytes(sizeInBytes);
+ SegmentsUtil.copyToBytes(segments, offset, bytes, 0,
sizeInBytes);
+ return new String(bytes, 0, sizeInBytes);
+ }
+
+ /**
+ * Get binary string, if len less than 8, will be include in
variablePartOffsetAndLen.
+ *
+ * <p>If len is less than 8, its binary format is:
+ * 1bit mark(1) = 1, 7bits len, and 7bytes data.
+ *
+ * <p>If len is greater or equal to 8, its binary format is:
+ * 1bit mark(1) = 0, 31bits offset, and 4bytes len.
+ * Data is stored in variable-length part.
+ *
+ * <p>Note: Need to consider the ByteOrder.
+ *
+ * @param baseOffset base offset of composite binary format.
+ * @param fieldOffset absolute start offset of
'variablePartOffsetAndLen'.
+ * @param variablePartOffsetAndLen a long value, real data or offset
and len.
+ */
+ static BinaryString readBinaryStringFieldFromSegments(
+ MemorySegment[] segments, int baseOffset, int
fieldOffset,
+ long variablePartOffsetAndLen) {
+ long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+ if (mark == 0) {
+ final int subOffset = (int) (variablePartOffsetAndLen
>> 32);
+ final int len = (int) variablePartOffsetAndLen;
+ return new BinaryString(segments, baseOffset +
subOffset, len);
+ } else {
+ int len = (int) ((variablePartOffsetAndLen &
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+ if (SegmentsUtil.LITTLE_ENDIAN) {
+ return new BinaryString(segments, fieldOffset,
len);
+ } else {
+ // fieldOffset + 1 to skip header.
+ return new BinaryString(segments, fieldOffset +
1, len);
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
new file mode 100644
index 0000000..435b064
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Writer to write a composite data format, like row, array.
+ * 1. Invoke {@link #reset()}.
+ * 2. Write each field by writeXX or setNullAt. (Same field can not be written
repeatedly.)
+ * 3. Invoke {@link #complete()}.
+ */
+public interface BinaryWriter {
+
+ /**
+ * Reset writer to prepare next write.
+ */
+ void reset();
+
+ /**
+ * Set null to this field.
+ */
+ void setNullAt(int pos);
+
+ void writeBoolean(int pos, boolean value);
+
+ void writeByte(int pos, byte value);
+
+ void writeShort(int pos, short value);
+
+ void writeInt(int pos, int value);
+
+ void writeLong(int pos, long value);
+
+ void writeFloat(int pos, float value);
+
+ void writeDouble(int pos, double value);
+
+ void writeChar(int pos, char value);
+
+ void writeString(int pos, BinaryString value);
+
+ void writeArray(int pos, BinaryArray value);
+
+ void writeMap(int pos, BinaryMap value);
+
+ /**
+ * Finally, complete write to set real size to binary.
+ */
+ void complete();
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
new file mode 100644
index 0000000..f0484f7
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * A GenericRow can have arbitrary number of fields and contain a set of
fields, which may all be
+ * different types. The fields in GenericRow can be null.
+ *
+ * <p>The fields in the Row can be accessed by position (zero-based) {@link
#getInt}.
+ * And can update fields by {@link #setField(int, Object)}.
+ *
+ * <p>GenericRow is in principle serializable. However, it may contain
non-serializable fields,
+ * in which case serialization will fail.
+ */
+public final class GenericRow extends ObjectArrayRow {
+
+ public GenericRow(int arity) {
+ super(arity);
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return (boolean) this.fields[ordinal];
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ return (byte) this.fields[ordinal];
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ return (short) this.fields[ordinal];
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return (int) this.fields[ordinal];
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ return (long) this.fields[ordinal];
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ return (float) this.fields[ordinal];
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ return (double) this.fields[ordinal];
+ }
+
+ @Override
+ public char getChar(int ordinal) {
+ return (char) this.fields[ordinal];
+ }
+
+ @Override
+ public void setBoolean(int ordinal, boolean value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setByte(int ordinal, byte value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setShort(int ordinal, short value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setInt(int ordinal, int value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setLong(int ordinal, long value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setFloat(int ordinal, float value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setDouble(int ordinal, double value) {
+ this.fields[ordinal] = value;
+ }
+
+ @Override
+ public void setChar(int ordinal, char value) {
+ this.fields[ordinal] = value;
+ }
+
+ public void setField(int ordinal, Object value) {
+ this.fields[ordinal] = value;
+ }
+
+ public static GenericRow of(Object... values) {
+ GenericRow row = new GenericRow(values.length);
+
+ for (int i = 0; i < values.length; ++i) {
+ row.setField(i, values[i]);
+ }
+
+ return row;
+ }
+}
+
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
new file mode 100644
index 0000000..d5d47b0
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+
+/**
+ * An row implementation that uses an array of objects as the underlying
storage.
+ */
+public abstract class ObjectArrayRow implements BaseRow {
+
+ private byte header;
+
+ protected final Object[] fields;
+
+ public ObjectArrayRow(int arity) {
+ this.fields = new Object[arity];
+ }
+
+ @Override
+ public int getArity() {
+ return fields.length;
+ }
+
+ @Override
+ public byte getHeader() {
+ return header;
+ }
+
+ @Override
+ public void setHeader(byte header) {
+ this.header = header;
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ return this.fields[ordinal] == null;
+ }
+
+ @Override
+ public void setNullAt(int ordinal) {
+ this.fields[ordinal] = null;
+ }
+
+ @Override
+ public BinaryString getString(int ordinal) {
+ return (BinaryString) this.fields[ordinal];
+ }
+
+ @Override
+ public BinaryArray getArray(int ordinal) {
+ return (BinaryArray) this.fields[ordinal];
+ }
+
+ @Override
+ public BinaryMap getMap(int ordinal) {
+ return (BinaryMap) this.fields[ordinal];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getHeader()).append("|");
+ for (int i = 0; i < fields.length; i++) {
+ if (i != 0) {
+ sb.append(",");
+ }
+ sb.append(StringUtils.arrayAwareToString(fields[i]));
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * Byte.hashCode(getHeader()) +
Arrays.hashCode(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o != null && o instanceof ObjectArrayRow) {
+ ObjectArrayRow other = (ObjectArrayRow) o;
+ return header == other.header && Arrays.equals(fields,
other.fields);
+ } else {
+ return false;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
new file mode 100644
index 0000000..e567add
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Provide type specialized getters and setters to reduce if/else and
eliminate box and unbox.
+ *
+ * <p>There is only setter for the fixed-length type field, because the
variable-length type
+ * cannot be set to the binary format such as {@link BinaryFormat}.</p>
+ *
+ * <p>All the {@code getXxx(int)} methods do not guarantee new object returned
when every
+ * time called.</p>
+ */
+public interface TypeGetterSetters {
+
+ /**
+ * Because the specific row implementation such as BinaryRow uses the
binary format. We must
+ * first determine if it is null, and then make a specific get.
+ *
+ * @return true if this field is null.
+ */
+ boolean isNullAt(int ordinal);
+
+ /**
+ * Set null to this field.
+ */
+ void setNullAt(int ordinal);
+
+ /**
+ * Get boolean value.
+ */
+ boolean getBoolean(int ordinal);
+
+ /**
+ * Get byte value.
+ */
+ byte getByte(int ordinal);
+
+ /**
+ * Get short value.
+ */
+ short getShort(int ordinal);
+
+ /**
+ * Get int value.
+ */
+ int getInt(int ordinal);
+
+ /**
+ * Get long value.
+ */
+ long getLong(int ordinal);
+
+ /**
+ * Get float value.
+ */
+ float getFloat(int ordinal);
+
+ /**
+ * Get double value.
+ */
+ double getDouble(int ordinal);
+
+ /**
+ * Get char value.
+ */
+ char getChar(int ordinal);
+
+ /**
+ * Get string value, internal format is BinaryString.
+ */
+ BinaryString getString(int ordinal);
+
+ /**
+ * Get array value, internal format is BinaryArray.
+ */
+ BinaryArray getArray(int ordinal);
+
+ /**
+ * Get map value, internal format is BinaryMap.
+ */
+ BinaryMap getMap(int ordinal);
+
+ /**
+ * Set boolean value.
+ */
+ void setBoolean(int ordinal, boolean value);
+
+ /**
+ * Set byte value.
+ */
+ void setByte(int ordinal, byte value);
+
+ /**
+ * Set short value.
+ */
+ void setShort(int ordinal, short value);
+
+ /**
+ * Set int value.
+ */
+ void setInt(int ordinal, int value);
+
+ /**
+ * Set long value.
+ */
+ void setLong(int ordinal, long value);
+
+ /**
+ * Set float value.
+ */
+ void setFloat(int ordinal, float value);
+
+ /**
+ * Set double value.
+ */
+ void setDouble(int ordinal, double value);
+
+ /**
+ * Set char value.
+ */
+ void setChar(int ordinal, char value);
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/BinaryStringUtil.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/BinaryStringUtil.java
new file mode 100644
index 0000000..78f660d
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/BinaryStringUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.table.dataformat.BinaryString;
+
+/**
+ * Util for {@link BinaryString}.
+ */
+public class BinaryStringUtil {
+
+ /**
+ * SQL execution threads is limited, not too many, so it can bear the
overhead of 64K per thread.
+ */
+ private static final int MAX_BYTES_LENGTH = 1024 * 64;
+ private static final ThreadLocal<byte[]> BYTES_LOCAL = new
ThreadLocal<>();
+
+ /**
+ * Allocate bytes that is only for temporary usage, it should not be
stored in somewhere else.
+ * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[]
new and gc.
+ *
+ * <p>If there are methods that can only accept a byte[], instead of a
MemorySegment[]
+ * parameter, we can allocate a reuse bytes and copy the MemorySegment
data to byte[],
+ * then call the method. Such as String deserialization.
+ */
+ public static byte[] allocateReuseBytes(int length) {
+ byte[] bytes = BYTES_LOCAL.get();
+
+ if (bytes == null) {
+ if (length <= MAX_BYTES_LENGTH) {
+ bytes = new byte[MAX_BYTES_LENGTH];
+ BYTES_LOCAL.set(bytes);
+ } else {
+ bytes = new byte[length];
+ }
+ } else if (bytes.length < length) {
+ bytes = new byte[length];
+ }
+
+ return bytes;
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
new file mode 100644
index 0000000..4ebb401
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.nio.ByteOrder;
+
+/**
+ * Util for data format segments calc.
+ */
+public class SegmentsUtil {
+
+ /**
+ * Constant that flags the byte order.
+ */
+ public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() ==
ByteOrder.LITTLE_ENDIAN;
+
+ private static final int BIT_BYTE_POSITION_MASK = 0xfffffff8;
+
+ private static final int BIT_BYTE_INDEX_MASK = 7;
+
+ /**
+ * Copy segments to a new byte[].
+ *
+ * @param segments Source segments.
+ * @param offset Source segments offset.
+ * @param numBytes the number bytes to copy.
+ */
+ public static byte[] copyToBytes(MemorySegment[] segments, int offset,
int numBytes) {
+ return copyToBytes(segments, offset, new byte[numBytes], 0,
numBytes);
+ }
+
+ /**
+ * Copy segments to target byte[].
+ *
+ * @param segments Source segments.
+ * @param offset Source segments offset.
+ * @param bytes target byte[].
+ * @param bytesOffset target byte[] offset.
+ * @param numBytes the number bytes to copy.
+ */
+ public static byte[] copyToBytes(MemorySegment[] segments, int offset,
byte[] bytes,
+ int bytesOffset, int numBytes) {
+ if (inFirstSegment(segments, offset, numBytes)) {
+ segments[0].get(offset, bytes, bytesOffset, numBytes);
+ } else {
+ copyMultiSegmentsToBytes(segments, offset, bytes,
bytesOffset, numBytes);
+ }
+ return bytes;
+ }
+
+ private static void copyMultiSegmentsToBytes(MemorySegment[] segments,
int offset, byte[] bytes,
+ int bytesOffset, int numBytes) {
+ int remainSize = numBytes;
+ for (MemorySegment segment : segments) {
+ int remain = segment.size() - offset;
+ if (remain > 0) {
+ int nCopy = Math.min(remain, remainSize);
+ segment.get(offset, bytes, numBytes -
remainSize + bytesOffset, nCopy);
+ remainSize -= nCopy;
+ // next new segment.
+ offset = 0;
+ if (remainSize == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next
segment
+ // now the offset = offset - segmentSize
(-remain)
+ offset = -remain;
+ }
+ }
+ }
+
+ public static void copyToUnsafe(
+ MemorySegment[] segments, int offset,
+ Object target, int pointer, int numBytes) {
+ if (inFirstSegment(segments, offset, numBytes)) {
+ segments[0].copyToUnsafe(offset, target, pointer,
numBytes);
+ } else {
+ copyMultiSegmentsToUnsafe(segments, offset, target,
pointer, numBytes);
+ }
+ }
+
+ private static void copyMultiSegmentsToUnsafe(
+ MemorySegment[] segments, int offset,
+ Object target, int pointer, int numBytes) {
+ int remainSize = numBytes;
+ for (MemorySegment segment : segments) {
+ int remain = segment.size() - offset;
+ if (remain > 0) {
+ int nCopy = Math.min(remain, remainSize);
+ segment.copyToUnsafe(offset, target, numBytes -
remainSize + pointer, nCopy);
+ remainSize -= nCopy;
+ // next new segment.
+ offset = 0;
+ if (remainSize == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next
segment
+ // now the offset = offset - segmentSize
(-remain)
+ offset = -remain;
+ }
+ }
+ }
+
+ /**
+ * Equals two memory segments regions.
+ *
+ * @param segments1 Segments 1
+ * @param offset1 Offset of segments1 to start equaling
+ * @param segments2 Segments 2
+ * @param offset2 Offset of segments2 to start equaling
+ * @param len Length of the equaled memory region
+ *
+ * @return true if equal, false otherwise
+ */
+ public static boolean equals(
+ MemorySegment[] segments1, int offset1,
+ MemorySegment[] segments2, int offset2, int len) {
+ if (inFirstSegment(segments1, offset1, len) &&
inFirstSegment(segments2, offset2, len)) {
+ return segments1[0].equalTo(segments2[0], offset1,
offset2, len);
+ } else {
+ return equalsMultiSegments(segments1, offset1,
segments2, offset2, len);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean equalsMultiSegments(
+ MemorySegment[] segments1, int offset1,
+ MemorySegment[] segments2, int offset2, int len) {
+ if (len == 0) {
+ // quick way and avoid segSize is zero.
+ return true;
+ }
+
+ int segSize1 = segments1[0].size();
+ int segSize2 = segments2[0].size();
+
+ // find first segIndex and segOffset of segments.
+ int segIndex1 = offset1 / segSize1;
+ int segIndex2 = offset2 / segSize2;
+ int segOffset1 = offset1 - segSize1 * segIndex1; // equal to %
+ int segOffset2 = offset2 - segSize2 * segIndex2; // equal to %
+
+ while (len > 0) {
+ int equalLen = Math.min(Math.min(len, segSize1 -
segOffset1), segSize2 - segOffset2);
+ if (!segments1[segIndex1].equalTo(segments2[segIndex2],
segOffset1, segOffset2, equalLen)) {
+ return false;
+ }
+ len -= equalLen;
+ segOffset1 += equalLen;
+ if (segOffset1 == segSize1) {
+ segOffset1 = 0;
+ segIndex1++;
+ }
+ segOffset2 += equalLen;
+ if (segOffset2 == segSize2) {
+ segOffset2 = 0;
+ segIndex2++;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Is it just in first MemorySegment, we use quick way to do something.
+ */
+ private static boolean inFirstSegment(MemorySegment[] segments, int
offset, int numBytes) {
+ return numBytes + offset <= segments[0].size();
+ }
+
+ /**
+ * unset bit.
+ *
+ * @param segment target segment.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static void bitUnSet(MemorySegment segment, int baseOffset, int
index) {
+ int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>>
3);
+ byte current = segment.get(offset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(offset, current);
+ }
+
+ /**
+ * set bit.
+ *
+ * @param segment target segment.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static void bitSet(MemorySegment segment, int baseOffset, int
index) {
+ int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>>
3);
+ byte current = segment.get(offset);
+ current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(offset, current);
+ }
+
+ /**
+ * read bit.
+ *
+ * @param segment target segment.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static boolean bitGet(MemorySegment segment, int baseOffset, int
index) {
+ int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>>
3);
+ byte current = segment.get(offset);
+ return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+ }
+
+ /**
+ * unset bit from segments.
+ *
+ * @param segments target segments.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static void bitUnSet(MemorySegment[] segments, int baseOffset,
int index) {
+ if (segments.length == 1) {
+ MemorySegment segment = segments[0];
+ int offset = baseOffset + ((index &
BIT_BYTE_POSITION_MASK) >>> 3);
+ byte current = segment.get(offset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(offset, current);
+ } else {
+ bitUnSetMultiSegments(segments, baseOffset, index);
+ }
+ }
+
+ private static void bitUnSetMultiSegments(MemorySegment[] segments, int
baseOffset, int index) {
+ int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>>
3);
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+ MemorySegment segment = segments[segIndex];
+
+ byte current = segment.get(segOffset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(segOffset, current);
+ }
+
+ /**
+ * set bit from segments.
+ *
+ * @param segments target segments.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static void bitSet(MemorySegment[] segments, int baseOffset, int
index) {
+ if (segments.length == 1) {
+ int offset = baseOffset + ((index &
BIT_BYTE_POSITION_MASK) >>> 3);
+ MemorySegment segment = segments[0];
+ byte current = segment.get(offset);
+ current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(offset, current);
+ } else {
+ bitSetMultiSegments(segments, baseOffset, index);
+ }
+ }
+
+ private static void bitSetMultiSegments(MemorySegment[] segments, int
baseOffset, int index) {
+ int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>>
3);
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+ MemorySegment segment = segments[segIndex];
+
+ byte current = segment.get(segOffset);
+ current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(segOffset, current);
+ }
+
+ /**
+ * read bit from segments.
+ *
+ * @param segments target segments.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static boolean bitGet(MemorySegment[] segments, int baseOffset,
int index) {
+ int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>>
3);
+ byte current = getByte(segments, offset);
+ return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+ }
+
+ /**
+ * get boolean from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static boolean getBoolean(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 1)) {
+ return segments[0].getBoolean(offset);
+ } else {
+ return getBooleanMultiSegments(segments, offset);
+ }
+ }
+
+ private static boolean getBooleanMultiSegments(MemorySegment[]
segments, int offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+ return segments[segIndex].getBoolean(segOffset);
+ }
+
+ /**
+ * set boolean from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setBoolean(MemorySegment[] segments, int offset,
boolean value) {
+ if (inFirstSegment(segments, offset, 1)) {
+ segments[0].putBoolean(offset, value);
+ } else {
+ setBooleanMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setBooleanMultiSegments(MemorySegment[] segments,
int offset, boolean value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+ segments[segIndex].putBoolean(segOffset, value);
+ }
+
+ /**
+ * get byte from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static byte getByte(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 1)) {
+ return segments[0].get(offset);
+ } else {
+ return getByteMultiSegments(segments, offset);
+ }
+ }
+
+ private static byte getByteMultiSegments(MemorySegment[] segments, int
offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+ return segments[segIndex].get(segOffset);
+ }
+
+ /**
+ * set byte from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setByte(MemorySegment[] segments, int offset, byte
value) {
+ if (inFirstSegment(segments, offset, 1)) {
+ segments[0].put(offset, value);
+ } else {
+ setByteMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setByteMultiSegments(MemorySegment[] segments, int
offset, byte value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+ segments[segIndex].put(segOffset, value);
+ }
+
+ /**
+ * get int from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static int getInt(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 4)) {
+ return segments[0].getInt(offset);
+ } else {
+ return getIntMultiSegments(segments, offset);
+ }
+ }
+
+ private static int getIntMultiSegments(MemorySegment[] segments, int
offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 3) {
+ return segments[segIndex].getInt(segOffset);
+ } else {
+ return getIntSlowly(segments, segSize, segIndex,
segOffset);
+ }
+ }
+
+ private static int getIntSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int
segOffset) {
+ MemorySegment segment = segments[segNum];
+ int ret = 0;
+ for (int i = 0; i < 4; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ int unsignedByte = segment.get(segOffset) & 0xff;
+ if (LITTLE_ENDIAN) {
+ ret |= (unsignedByte << (i * 8));
+ } else {
+ ret |= (unsignedByte << ((3 - i) * 8));
+ }
+ segOffset++;
+ }
+ return ret;
+ }
+
+ /**
+ * set int from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setInt(MemorySegment[] segments, int offset, int
value) {
+ if (inFirstSegment(segments, offset, 4)) {
+ segments[0].putInt(offset, value);
+ } else {
+ setIntMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setIntMultiSegments(MemorySegment[] segments, int
offset, int value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 3) {
+ segments[segIndex].putInt(segOffset, value);
+ } else {
+ setIntSlowly(segments, segSize, segIndex, segOffset,
value);
+ }
+ }
+
+ private static void setIntSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int
segOffset, int value) {
+ MemorySegment segment = segments[segNum];
+ for (int i = 0; i < 4; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ int unsignedByte;
+ if (LITTLE_ENDIAN) {
+ unsignedByte = value >> (i * 8);
+ } else {
+ unsignedByte = value >> ((3 - i) * 8);
+ }
+ segment.put(segOffset, (byte) unsignedByte);
+ segOffset++;
+ }
+ }
+
+ /**
+ * get long from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static long getLong(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 8)) {
+ return segments[0].getLong(offset);
+ } else {
+ return getLongMultiSegments(segments, offset);
+ }
+ }
+
+ private static long getLongMultiSegments(MemorySegment[] segments, int
offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 7) {
+ return segments[segIndex].getLong(segOffset);
+ } else {
+ return getLongSlowly(segments, segSize, segIndex,
segOffset);
+ }
+ }
+
+ private static long getLongSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int
segOffset) {
+ MemorySegment segment = segments[segNum];
+ long ret = 0;
+ for (int i = 0; i < 8; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ long unsignedByte = segment.get(segOffset) & 0xff;
+ if (LITTLE_ENDIAN) {
+ ret |= (unsignedByte << (i * 8));
+ } else {
+ ret |= (unsignedByte << ((7 - i) * 8));
+ }
+ segOffset++;
+ }
+ return ret;
+ }
+
+ /**
+ * set long from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setLong(MemorySegment[] segments, int offset, long
value) {
+ if (inFirstSegment(segments, offset, 8)) {
+ segments[0].putLong(offset, value);
+ } else {
+ setLongMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setLongMultiSegments(MemorySegment[] segments, int
offset, long value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 7) {
+ segments[segIndex].putLong(segOffset, value);
+ } else {
+ setLongSlowly(segments, segSize, segIndex, segOffset,
value);
+ }
+ }
+
+ private static void setLongSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int
segOffset, long value) {
+ MemorySegment segment = segments[segNum];
+ for (int i = 0; i < 8; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ long unsignedByte;
+ if (LITTLE_ENDIAN) {
+ unsignedByte = value >> (i * 8);
+ } else {
+ unsignedByte = value >> ((7 - i) * 8);
+ }
+ segment.put(segOffset, (byte) unsignedByte);
+ segOffset++;
+ }
+ }
+
+ /**
+ * get short from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static short getShort(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 2)) {
+ return segments[0].getShort(offset);
+ } else {
+ return getShortMultiSegments(segments, offset);
+ }
+ }
+
+ private static short getShortMultiSegments(MemorySegment[] segments,
int offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 1) {
+ return segments[segIndex].getShort(segOffset);
+ } else {
+ return (short) getTwoByteSlowly(segments, segSize,
segIndex, segOffset);
+ }
+ }
+
+ /**
+ * set short from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setShort(MemorySegment[] segments, int offset, short
value) {
+ if (inFirstSegment(segments, offset, 2)) {
+ segments[0].putShort(offset, value);
+ } else {
+ setShortMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setShortMultiSegments(MemorySegment[] segments, int
offset, short value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 1) {
+ segments[segIndex].putShort(segOffset, value);
+ } else {
+ setTwoByteSlowly(segments, segSize, segIndex,
segOffset, value, value >> 8);
+ }
+ }
+
+ /**
+ * get float from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static float getFloat(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 4)) {
+ return segments[0].getFloat(offset);
+ } else {
+ return getFloatMultiSegments(segments, offset);
+ }
+ }
+
+ private static float getFloatMultiSegments(MemorySegment[] segments,
int offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 3) {
+ return segments[segIndex].getFloat(segOffset);
+ } else {
+ return Float.intBitsToFloat(getIntSlowly(segments,
segSize, segIndex, segOffset));
+ }
+ }
+
+ /**
+ * set float from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setFloat(MemorySegment[] segments, int offset, float
value) {
+ if (inFirstSegment(segments, offset, 4)) {
+ segments[0].putFloat(offset, value);
+ } else {
+ setFloatMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setFloatMultiSegments(MemorySegment[] segments, int
offset, float value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 3) {
+ segments[segIndex].putFloat(segOffset, value);
+ } else {
+ setIntSlowly(segments, segSize, segIndex, segOffset,
Float.floatToRawIntBits(value));
+ }
+ }
+
+ /**
+ * get double from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static double getDouble(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 8)) {
+ return segments[0].getDouble(offset);
+ } else {
+ return getDoubleMultiSegments(segments, offset);
+ }
+ }
+
+ private static double getDoubleMultiSegments(MemorySegment[] segments,
int offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 7) {
+ return segments[segIndex].getDouble(segOffset);
+ } else {
+ return Double.longBitsToDouble(getLongSlowly(segments,
segSize, segIndex, segOffset));
+ }
+ }
+
+ /**
+ * set double from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setDouble(MemorySegment[] segments, int offset,
double value) {
+ if (inFirstSegment(segments, offset, 8)) {
+ segments[0].putDouble(offset, value);
+ } else {
+ setDoubleMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setDoubleMultiSegments(MemorySegment[] segments,
int offset, double value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 7) {
+ segments[segIndex].putDouble(segOffset, value);
+ } else {
+ setLongSlowly(segments, segSize, segIndex, segOffset,
Double.doubleToRawLongBits(value));
+ }
+ }
+
+ /**
+ * get char from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static char getChar(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 2)) {
+ return segments[0].getChar(offset);
+ } else {
+ return getCharMultiSegments(segments, offset);
+ }
+ }
+
+ private static char getCharMultiSegments(MemorySegment[] segments, int
offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 1) {
+ return segments[segIndex].getChar(segOffset);
+ } else {
+ return (char) getTwoByteSlowly(segments, segSize,
segIndex, segOffset);
+ }
+ }
+
+ private static int getTwoByteSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int
segOffset) {
+ MemorySegment segment = segments[segNum];
+ int ret = 0;
+ for (int i = 0; i < 2; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ int unsignedByte = segment.get(segOffset) & 0xff;
+ if (LITTLE_ENDIAN) {
+ ret |= (unsignedByte << (i * 8));
+ } else {
+ ret |= (unsignedByte << ((1 - i) * 8));
+ }
+ segOffset++;
+ }
+ return ret;
+ }
+
+ /**
+ * set char from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setChar(MemorySegment[] segments, int offset, char
value) {
+ if (inFirstSegment(segments, offset, 2)) {
+ segments[0].putChar(offset, value);
+ } else {
+ setCharMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setCharMultiSegments(MemorySegment[] segments, int
offset, char value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 3) {
+ segments[segIndex].putChar(segOffset, value);
+ } else {
+ setTwoByteSlowly(segments, segSize, segIndex,
segOffset, value, value >> 8);
+ }
+ }
+
+ private static void setTwoByteSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int
segOffset, int b1, int b2) {
+ MemorySegment segment = segments[segNum];
+ segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b1 : b2));
+ segOffset++;
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b2 : b1));
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
new file mode 100644
index 0000000..c90270b
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of {@link BinaryArray} and {@link BinaryArrayWriter}.
+ */
+public class BinaryArrayTest {
+
+ @Test
+ public void testArray() {
+ // 1.array test
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
+
+ writer.writeInt(0, 6);
+ writer.setNullInt(1);
+ writer.writeInt(2, 666);
+ writer.complete();
+
+ assertEquals(array.getInt(0), 6);
+ assertTrue(array.isNullAt(1));
+ assertEquals(array.getInt(2), 666);
+
+ //2.test write to binary row.
+ {
+ BinaryRow row2 = new BinaryRow(1);
+ BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+ writer2.writeArray(0, array);
+ writer2.complete();
+
+ BinaryArray array2 = row2.getArray(0);
+ assertEquals(array2, array);
+ assertEquals(array2.getInt(0), 6);
+ assertTrue(array2.isNullAt(1));
+ assertEquals(array2.getInt(2), 666);
+ }
+
+ //3.test write var seg array to binary row.
+ {
+ BinaryArray array3 = splitArray(array);
+
+ BinaryRow row2 = new BinaryRow(1);
+ BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+ writer2.writeArray(0, array3);
+ writer2.complete();
+
+ BinaryArray array2 = row2.getArray(0);
+ assertEquals(array2, array);
+ assertEquals(array2.getInt(0), 6);
+ assertTrue(array2.isNullAt(1));
+ assertEquals(array2.getInt(2), 666);
+ }
+ }
+
+ @Test
+ public void testArrayTypes() {
+ {
+ // test bool
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 1);
+ writer.setNullBoolean(0);
+ writer.writeBoolean(1, true);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(true, array.getBoolean(1));
+ array.setBoolean(0, true);
+ assertEquals(true, array.getBoolean(0));
+ array.setNullBoolean(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(true, newArray.getBoolean(1));
+ newArray.setBoolean(0, true);
+ assertEquals(true, newArray.getBoolean(0));
+ newArray.setNullBoolean(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setBoolean(0, true);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toBooleanArray()));
+ }
+
+ {
+ // test byte
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 1);
+ writer.setNullByte(0);
+ writer.writeByte(1, (byte) 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(25, array.getByte(1));
+ array.setByte(0, (byte) 5);
+ assertEquals(5, array.getByte(0));
+ array.setNullByte(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(25, newArray.getByte(1));
+ newArray.setByte(0, (byte) 5);
+ assertEquals(5, newArray.getByte(0));
+ newArray.setNullByte(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setByte(0, (byte) 3);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toByteArray()));
+ }
+
+ {
+ // test short
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 2);
+ writer.setNullShort(0);
+ writer.writeShort(1, (short) 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(25, array.getShort(1));
+ array.setShort(0, (short) 5);
+ assertEquals(5, array.getShort(0));
+ array.setNullShort(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(25, newArray.getShort(1));
+ newArray.setShort(0, (short) 5);
+ assertEquals(5, newArray.getShort(0));
+ newArray.setNullShort(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setShort(0, (short) 3);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toShortArray()));
+ }
+
+ {
+ // test int
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 4);
+ writer.setNullInt(0);
+ writer.writeInt(1, 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(25, array.getInt(1));
+ array.setInt(0, 5);
+ assertEquals(5, array.getInt(0));
+ array.setNullInt(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(25, newArray.getInt(1));
+ newArray.setInt(0, 5);
+ assertEquals(5, newArray.getInt(0));
+ newArray.setNullInt(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setInt(0, 3);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toIntArray()));
+ }
+
+ {
+ // test long
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 8);
+ writer.setNullLong(0);
+ writer.writeLong(1, 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(25, array.getLong(1));
+ array.setLong(0, 5);
+ assertEquals(5, array.getLong(0));
+ array.setNullLong(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(25, newArray.getLong(1));
+ newArray.setLong(0, 5);
+ assertEquals(5, newArray.getLong(0));
+ newArray.setNullLong(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setLong(0, 3);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toLongArray()));
+ }
+
+ {
+ // test float
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 4);
+ writer.setNullFloat(0);
+ writer.writeFloat(1, 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertTrue(25 == array.getFloat(1));
+ array.setFloat(0, 5);
+ assertTrue(5 == array.getFloat(0));
+ array.setNullFloat(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertTrue(25 == newArray.getFloat(1));
+ newArray.setFloat(0, 5);
+ assertTrue(5 == newArray.getFloat(0));
+ newArray.setNullFloat(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setFloat(0, 3);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toFloatArray()));
+ }
+
+ {
+ // test double
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 8);
+ writer.setNullDouble(0);
+ writer.writeDouble(1, 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertTrue(25 == array.getDouble(1));
+ array.setDouble(0, 5);
+ assertTrue(5 == array.getDouble(0));
+ array.setNullDouble(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertTrue(25 == newArray.getDouble(1));
+ newArray.setDouble(0, 5);
+ assertTrue(5 == newArray.getDouble(0));
+ newArray.setNullDouble(0);
+ assertTrue(newArray.isNullAt(0));
+
+ newArray.setDouble(0, 3);
+ assertEquals(newArray,
BinaryArray.fromPrimitiveArray(newArray.toDoubleArray()));
+ }
+
+ {
+ // test char
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 2);
+ writer.setNullShort(0);
+ writer.writeChar(1, (char) 25);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(25, array.getChar(1));
+ array.setChar(0, (char) 5);
+ assertEquals(5, array.getChar(0));
+ array.setNullChar(0);
+ assertTrue(array.isNullAt(0));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(25, newArray.getChar(1));
+ newArray.setChar(0, (char) 5);
+ assertEquals(5, newArray.getChar(0));
+ newArray.setNullChar(0);
+ assertTrue(newArray.isNullAt(0));
+ }
+
+ {
+ // test string
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 8);
+ writer.setNullAt(0);
+ writer.writeString(1, fromString("jaja"));
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(fromString("jaja"), array.getString(1));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(fromString("jaja"), newArray.getString(1));
+ }
+
+ BinaryArray subArray = new BinaryArray();
+ BinaryArrayWriter subWriter = new BinaryArrayWriter(subArray,
2, 8);
+ subWriter.setNullAt(0);
+ subWriter.writeString(1, fromString("hehehe"));
+ subWriter.complete();
+
+ {
+ // test array
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 8);
+ writer.setNullAt(0);
+ writer.writeArray(1, subArray);
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(subArray, array.getArray(1));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(subArray, newArray.getArray(1));
+ }
+
+ {
+ // test map
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array,
2, 8);
+ writer.setNullAt(0);
+ writer.writeMap(1, BinaryMap.valueOf(subArray,
subArray));
+ writer.complete();
+
+ assertTrue(array.isNullAt(0));
+ assertEquals(BinaryMap.valueOf(subArray, subArray),
array.getMap(1));
+
+ BinaryArray newArray = splitArray(array);
+ assertTrue(newArray.isNullAt(0));
+ assertEquals(BinaryMap.valueOf(subArray, subArray),
newArray.getMap(1));
+ }
+ }
+
+ @Test
+ public void testMap() {
+ BinaryArray array1 = new BinaryArray();
+ BinaryArrayWriter writer1 = new BinaryArrayWriter(array1, 3, 4);
+ writer1.writeInt(0, 6);
+ writer1.writeInt(1, 5);
+ writer1.writeInt(2, 666);
+ writer1.complete();
+
+ BinaryArray array2 = new BinaryArray();
+ BinaryArrayWriter writer2 = new BinaryArrayWriter(array2, 3, 8);
+ writer2.writeString(0, fromString("6"));
+ writer2.writeString(1, fromString("5"));
+ writer2.writeString(2, fromString("666"));
+ writer2.complete();
+
+ BinaryMap binaryMap = BinaryMap.valueOf(array1, array2);
+
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+ rowWriter.writeMap(0, binaryMap);
+ rowWriter.complete();
+
+ BinaryMap map = row.getMap(0);
+ BinaryArray key = map.keyArray();
+ BinaryArray value = map.valueArray();
+
+ assertEquals(binaryMap, map);
+ assertEquals(array1, key);
+ assertEquals(array2, value);
+
+ assertEquals(key.getInt(1), 5);
+ assertEquals(value.getString(1), fromString("5"));
+ }
+
+ private static BinaryArray splitArray(BinaryArray array) {
+ BinaryArray ret = new BinaryArray();
+ MemorySegment[] segments =
splitBytes(SegmentsUtil.copyToBytes(array.segments, 0, array.sizeInBytes), 0);
+ ret.pointTo(segments, 0, array.sizeInBytes);
+ return ret;
+ }
+
+ private static MemorySegment[] splitBytes(byte[] bytes, int baseOffset)
{
+ int newSize = (bytes.length + 1) / 2 + baseOffset;
+ MemorySegment[] ret = new MemorySegment[2];
+ ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+ ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+ ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+ ret[1].put(0, bytes, newSize - baseOffset, bytes.length -
(newSize - baseOffset));
+ return ret;
+ }
+
+ @Test
+ public void testToArray() {
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 2);
+ writer.writeShort(0, (short) 5);
+ writer.writeShort(1, (short) 10);
+ writer.writeShort(2, (short) 15);
+ writer.complete();
+
+ short[] shorts = array.toShortArray();
+ assertEquals(5, shorts[0]);
+ assertEquals(10, shorts[1]);
+ assertEquals(15, shorts[2]);
+
+ MemorySegment[] segments =
splitBytes(writer.segment.getArray(), 3);
+ array.pointTo(segments, 3, array.getSizeInBytes());
+ assertEquals(5, array.getShort(0));
+ assertEquals(10, array.getShort(1));
+ assertEquals(15, array.getShort(2));
+ short[] shorts2 = array.toShortArray();
+ assertEquals(5, shorts2[0]);
+ assertEquals(10, shorts2[1]);
+ assertEquals(15, shorts2[2]);
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
new file mode 100644
index 0000000..a0e9892
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of {@link BinaryRow} and {@link BinaryRowWriter}.
+ */
+public class BinaryRowTest {
+
+ @Test
+ public void testBasic() {
+ // consider header 1 byte.
+ assertEquals(8, new BinaryRow(0).getFixedLengthPartSize());
+ assertEquals(16, new BinaryRow(1).getFixedLengthPartSize());
+ assertEquals(536, new BinaryRow(65).getFixedLengthPartSize());
+ assertEquals(1048, new BinaryRow(128).getFixedLengthPartSize());
+
+ MemorySegment segment = MemorySegmentFactory.wrap(new
byte[100]);
+ BinaryRow row = new BinaryRow(2);
+ row.pointTo(segment, 10, 48);
+ assertTrue(row.getSegments()[0] == segment);
+ row.setInt(0, 5);
+ row.setDouble(1, 5.8D);
+ }
+
+ @Test
+ public void testSetAndGet() {
+ MemorySegment segment = MemorySegmentFactory.wrap(new byte[80]);
+ BinaryRow row = new BinaryRow(9);
+ row.pointTo(segment, 0, 80);
+ row.setNullAt(0);
+ row.setInt(1, 11);
+ row.setLong(2, 22);
+ row.setDouble(3, 33);
+ row.setBoolean(4, true);
+ row.setShort(5, (short) 55);
+ row.setByte(6, (byte) 66);
+ row.setFloat(7, 77f);
+ row.setChar(8, 'a');
+
+ assertEquals(33d, (long) row.getDouble(3), 0);
+ assertEquals(11, row.getInt(1));
+ assertTrue(row.isNullAt(0));
+ assertEquals(55, row.getShort(5));
+ assertEquals(22, row.getLong(2));
+ assertEquals(true, row.getBoolean(4));
+ assertEquals((byte) 66, row.getByte(6));
+ assertEquals(77f, row.getFloat(7), 0);
+ assertEquals('a', row.getChar(8));
+ }
+
+ @Test
+ public void testWriter() {
+
+ int arity = 13;
+ BinaryRow row = new BinaryRow(arity);
+ BinaryRowWriter writer = new BinaryRowWriter(row, 20);
+
+ writer.writeString(0, fromString("1"));
+ writer.writeString(3, fromString("1234567"));
+ writer.writeString(5, fromString("12345678"));
+ writer.writeString(9, fromString("啦啦啦啦啦我是快乐的粉刷匠"));
+
+ writer.writeBoolean(1, true);
+ writer.writeByte(2, (byte) 99);
+ writer.writeChar(4, 'x');
+ writer.writeDouble(6, 87.1d);
+ writer.writeFloat(7, 26.1f);
+ writer.writeInt(8, 88);
+ writer.writeLong(10, 284);
+ writer.writeShort(11, (short) 292);
+ writer.setNullAt(12);
+
+ writer.complete();
+
+ assertTestWriterRow(row);
+ assertTestWriterRow(row.copy());
+
+ // test copy from var segments.
+ int subSize = row.getFixedLengthPartSize() + 10;
+ MemorySegment subMs1 = MemorySegmentFactory.wrap(new
byte[subSize]);
+ MemorySegment subMs2 = MemorySegmentFactory.wrap(new
byte[subSize]);
+ row.getSegments()[0].copyTo(0, subMs1, 0, subSize);
+ row.getSegments()[0].copyTo(subSize, subMs2, 0,
row.getSizeInBytes() - subSize);
+
+ BinaryRow toCopy = new BinaryRow(arity);
+ toCopy.pointTo(new MemorySegment[]{subMs1, subMs2}, 0,
row.getSizeInBytes());
+ assertEquals(row, toCopy);
+ assertTestWriterRow(toCopy);
+ assertTestWriterRow(toCopy.copy(new BinaryRow(arity)));
+ }
+
+ @Test
+ public void testwriteString() throws IOException {
+ {
+ // litter byte[]
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ char[] chars = new char[2];
+ chars[0] = 0xFFFF;
+ chars[1] = 0;
+ writer.writeString(0, fromString(new String(chars)));
+ writer.complete();
+
+ String str = row.getString(0).toString();
+ assertEquals(chars[0], str.charAt(0));
+ assertEquals(chars[1], str.charAt(1));
+ }
+
+ {
+ // big byte[]
+ String str = "啦啦啦啦啦我是快乐的粉刷匠";
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, fromString(str));
+ writer.complete();
+
+ assertEquals(str, row.getString(0).toString());
+ }
+ }
+
+ private void assertTestWriterRow(BinaryRow row) {
+ assertEquals("1", row.getString(0).toString());
+ assertEquals(88, row.getInt(8));
+ assertEquals((short) 292, row.getShort(11));
+ assertEquals(284, row.getLong(10));
+ assertEquals((byte) 99, row.getByte(2));
+ assertEquals('x', row.getChar(4));
+ assertEquals(87.1d, row.getDouble(6), 0);
+ assertEquals(26.1f, row.getFloat(7), 0);
+ assertEquals(true, row.getBoolean(1));
+ assertEquals("1234567", row.getString(3).toString());
+ assertEquals("12345678", row.getString(5).toString());
+ assertEquals("啦啦啦啦啦我是快乐的粉刷匠", row.getString(9).toString());
+ assertTrue(row.isNullAt(12));
+ }
+
+ @Test
+ public void testReuseWriter() {
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, fromString("01234567"));
+ writer.writeString(1, fromString("012345678"));
+ writer.complete();
+ assertEquals("01234567", row.getString(0).toString());
+ assertEquals("012345678", row.getString(1).toString());
+
+ writer.reset();
+ writer.writeString(0, fromString("1"));
+ writer.writeString(1, fromString("0123456789"));
+ writer.complete();
+ assertEquals("1", row.getString(0).toString());
+ assertEquals("0123456789", row.getString(1).toString());
+ }
+
+ @Test
+ public void anyNullTest() throws IOException {
+ {
+ BinaryRow row = new BinaryRow(3);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ assertFalse(row.anyNull());
+
+ // test header should not compute by anyNull
+ row.setHeader((byte) 1);
+ assertFalse(row.anyNull());
+
+ writer.setNullAt(2);
+ assertTrue(row.anyNull());
+
+ writer.setNullAt(0);
+ assertTrue(row.anyNull(new int[]{0, 1, 2}));
+ assertFalse(row.anyNull(new int[]{1}));
+
+ writer.setNullAt(1);
+ assertTrue(row.anyNull());
+ }
+
+ {
+ BinaryRow row = new BinaryRow(80);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ assertFalse(row.anyNull());
+
+ writer.setNullAt(3);
+ assertTrue(row.anyNull());
+
+ writer = new BinaryRowWriter(row);
+ writer.setNullAt(65);
+ assertTrue(row.anyNull());
+ }
+ }
+
+ @Test
+ public void testHeaderSize() throws IOException {
+ assertEquals(8, BinaryRow.calculateBitSetWidthInBytes(56));
+ assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(57));
+ assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(120));
+ assertEquals(24, BinaryRow.calculateBitSetWidthInBytes(121));
+ }
+
+ @Test
+ public void testHeader() throws IOException {
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ writer.writeInt(0, 10);
+ writer.setNullAt(1);
+ writer.writeHeader((byte) 29);
+ writer.complete();
+
+ BinaryRow newRow = row.copy();
+ assertEquals(row, newRow);
+ assertEquals((byte) 29, newRow.getHeader());
+
+ newRow.setHeader((byte) 19);
+ assertEquals((byte) 19, newRow.getHeader());
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
new file mode 100644
index 0000000..c1808bc
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryRowTest;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link SegmentsUtil}, most is covered by {@link BinaryRowTest},
+ * this just test some boundary scenarios testing.
+ */
+public class SegmentsUtilTest {
+
+ @Test
+ public void testCopy() {
+ // test copy the content of the latter Seg
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = MemorySegmentFactory.wrap(new byte[]{0, 2, 5});
+ segments[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15});
+
+ byte[] bytes = SegmentsUtil.copyToBytes(segments, 4, 2);
+ Assert.assertArrayEquals(new byte[] {12, 15}, bytes);
+ }
+
+ @Test
+ public void testEquals() {
+ // test copy the content of the latter Seg
+ MemorySegment[] segments1 = new MemorySegment[3];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[]{0, 2, 5});
+ segments1[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15});
+ segments1[2] = MemorySegmentFactory.wrap(new byte[]{1, 1, 1});
+
+ MemorySegment[] segments2 = new MemorySegment[2];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[]{6, 0, 2,
5});
+ segments2[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15,
18});
+
+ Assert.assertTrue(SegmentsUtil.equalsMultiSegments(segments1,
0, segments2, 0, 0));
+ Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2,
1, 3));
+ Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2,
1, 6));
+ Assert.assertFalse(SegmentsUtil.equals(segments1, 0, segments2,
1, 7));
+ }
+
+}