KurtYoung commented on a change in pull request #7816: 
[FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
URL: https://github.com/apache/flink/pull/7816#discussion_r261457262
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 ##########
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A special row which is backed by {@link MemorySegment} instead of Object. 
It can significantly reduce the
+ * serialization/deserialization of Java objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains null bit set and field values. Null bit set 
is used for null tracking and is
+ * aligned to 8-byte word boundaries. `Field values` holds fixed-length 
primitive types and variable-length
+ * values which can be stored in 8 bytes inside. If it do not fit the 
variable-length field, then store the
+ * length and offset of variable-length part. Fixed-length part will certainly 
fall into a MemorySegment,
+ * which will speed up the read and write of field.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ *
+ * <p>{@code BinaryRow} are influenced by Apache Spark UnsafeRow in project 
tungsten.
+ * The difference is that BinaryRow is placed on a discontinuous memory, and 
the variable length type can
+ * also be placed on a fixed length area (If it's short enough).
+ */
+public final class BinaryRow extends BinaryFormat<Object> implements BaseRow {
+
+       public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN);
+
+       public static int calculateBitSetWidthInBytes(int arity) {
+               // add 8 bit header
+               return ((arity + 63 + 8) / 64) * 8;
+       }
+
+       private final int arity;
+       private final int nullBitsSizeInBytes;
+
+       public BinaryRow(int arity) {
+               checkArgument(arity >= 0);
+               this.arity = arity;
+               this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+       }
+
+       private int getFieldOffset(int pos) {
+               return offset + nullBitsSizeInBytes + pos * 8;
+       }
+
+       private void assertIndexIsValid(int index) {
+               assert index >= 0 : "index (" + index + ") should >= 0";
+               assert index < arity : "index (" + index + ") should < " + 
arity;
+       }
+
+       public int getFixedLengthPartSize() {
+               return nullBitsSizeInBytes + 8 * arity;
+       }
+
+       @Override
+       public int getArity() {
+               return arity;
+       }
+
+       @Override
+       public byte getHeader() {
+               // first nullBitsSizeInBytes byte is header.
+               return segments[0].get(offset);
+       }
+
+       @Override
+       public void setHeader(byte header) {
+               segments[0].put(offset, header);
+       }
+
+       public void pointTo(MemorySegment segment, int offset, int sizeInBytes) 
{
+               this.segments = new MemorySegment[] {segment};
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public void pointTo(MemorySegment[] segments, int offset, int 
sizeInBytes) {
+               this.segments = segments;
+               this.offset = offset;
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       public void setTotalSize(int sizeInBytes) {
+               this.sizeInBytes = sizeInBytes;
+       }
+
+       private void setNotNullAt(int i) {
+               assertIndexIsValid(i);
+               // need add header 8 bit.
+               SegmentsUtil.bitUnSet(segments[0], offset, i + 8);
+       }
+
+       @Override
+       public void setNullAt(int i) {
+               assertIndexIsValid(i);
+               // need add header 8 bit.
+               SegmentsUtil.bitSet(segments[0], offset, i + 8);
+               // We must set the fixed length part zero.
+               // 1.Only int/long/boolean...(Fix length type) will invoke this 
setNullAt.
+               // 2.Set to zero in order to equals and hash operation bytes 
calculation.
+               segments[0].putLong(getFieldOffset(i), 0);
+       }
+
+       @Override
+       public void setInt(int pos, int value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putInt(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setLong(int pos, long value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putLong(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setDouble(int pos, double value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putDouble(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setChar(int pos, char value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putChar(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setBoolean(int pos, boolean value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putBoolean(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setShort(int pos, short value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putShort(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setByte(int pos, byte value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].put(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public void setFloat(int pos, float value) {
+               assertIndexIsValid(pos);
+               setNotNullAt(pos);
+               segments[0].putFloat(getFieldOffset(pos), value);
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               assertIndexIsValid(pos);
+               // need add header 8 bit.
+               return SegmentsUtil.bitGet(segments[0], offset, pos + 8);
+       }
+
+       @Override
+       public boolean getBoolean(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getBoolean(getFieldOffset(pos));
+       }
+
+       @Override
+       public byte getByte(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].get(getFieldOffset(pos));
+       }
+
+       @Override
+       public short getShort(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getShort(getFieldOffset(pos));
+       }
+
+       @Override
+       public int getInt(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getInt(getFieldOffset(pos));
+       }
+
+       @Override
+       public long getLong(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getLong(getFieldOffset(pos));
+       }
+
+       @Override
+       public float getFloat(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getFloat(getFieldOffset(pos));
+       }
+
+       @Override
+       public double getDouble(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getDouble(getFieldOffset(pos));
+       }
+
+       @Override
+       public char getChar(int pos) {
+               assertIndexIsValid(pos);
+               return segments[0].getChar(getFieldOffset(pos));
+       }
+
+       @Override
+       public BinaryString getString(int pos) {
+               int fieldOffset = getFieldOffset(pos);
+               final long offsetAndSize = segments[0].getLong(fieldOffset);
+               return BinaryFormat.readBinaryStringFieldFromSegments(segments, 
offset, fieldOffset, offsetAndSize);
+       }
+
+       /**
+        * The bit is 1 when the field is null. Default is 0.
+        */
+       public boolean anyNull() {
+               for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
 
 Review comment:
   Looks like you don't skip the header

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to