KurtYoung commented on a change in pull request #7996: 
[FLINK-11871][table-runtime-blink] Introduce LongHybridHashTable to improve 
performance when join key fits in long
URL: https://github.com/apache/flink/pull/7996#discussion_r266840328
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
 ##########
 @@ -0,0 +1,947 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.hashtable;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.SeekableDataInputView;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.util.FileChannelUtil;
+import org.apache.flink.table.runtime.util.RowIterator;
+import org.apache.flink.table.typeutils.BinaryRowSerializer;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.hashLong;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Partition for {@link LongHybridHashTable}.
+ *
+ * <p>The layout of the buckets inside a memory segment is as follows:</p>
+ *
+ * <p>Hash mode:
+ * +----------------------------- Bucket area ----------------------------
+ * | long key (8 bytes) | address (8 bytes) |
+ * | long key (8 bytes) | address (8 bytes) |
+ * | long key (8 bytes) | address (8 bytes) |
+ * | ...
+ * +----------------------------- Data area --------------------------
+ * | size & address of next row with the same key (8bytes) | binary row |
+ * | size & address of next row with the same key (8bytes) | binary row |
+ * | size & address of next row with the same key (8bytes) | binary row |
+ * | ...
+ *
+ * <p>Dense mode:
+ * +----------------------------- Bucket area ----------------------------
+ * | address1 (8 bytes) | address2 (8 bytes) | address3 (8 bytes) | ...
+ * Directly addressed by the index of the corresponding array of key values.
+ */
+public class LongHashPartition extends AbstractPagedInputView implements 
SeekableDataInputView {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(LongHashPartition.class);
+
+       // The number of bits for size in address
+       private static final int SIZE_BITS = 28;
+       private static final int SIZE_MASK = 0xfffffff;
+
+       // bucket element size in sparse mode: long key (8 bytes) + address 
pointer (8 bytes)
+       private static final int SPARSE_BUCKET_ELEMENT_SIZE_IN_BYTES = 16;
+
+       static final long INVALID_ADDRESS = 0x00000FFFFFFFFFL;
+
+       private final LongHybridHashTable longTable;
+
+       // segment size related properties
+       private final int segmentSize;
+       private final int segmentSizeBits;
+       private final int segmentSizeMask;
+
+       private int partitionNum;
+       private final BinaryRowSerializer buildSideSerializer;
+       private final BinaryRow buildReuseRow;
+       private int recursionLevel;
+
+       // The minimum key
+       private long minKey = Long.MAX_VALUE;
+
+       // The maximum key
+       private long maxKey = Long.MIN_VALUE;
+
+       // The bucket area for this partition
+       private MemorySegment[] buckets;
+       private int numBuckets;
+       private int numBucketsMask;
+
+       // The in-memory data area for this partition
+       private MemorySegment[] partitionBuffers;
+
+       private int finalBufferLimit;
+       private int currentBufferNum;
+       private BuildSideBuffer buildSideWriteBuffer;
+       AbstractChannelWriterOutputView probeSideBuffer;
+       long probeSideRecordCounter; // number of probe-side records in this 
partition
+
+       // The number of unique keys.
+       private long numKeys;
+
+       private final MatchIterator iterator;
+
+       // the channel writer for the build side, if partition is spilled
+       private BlockChannelWriter<MemorySegment> buildSideChannel;
+
+       // number of build-side records in this partition
+       private long buildSideRecordCounter;
+
+       int probeNumBytesInLastSeg;
+
+       /**
+        * Entrance 1: Init LongHashPartition for new insert and search.
+        */
+       LongHashPartition(
+                       LongHybridHashTable longTable,
+                       int partitionNum,
+                       BinaryRowSerializer buildSideSerializer,
+                       double estimatedRowCount,
+                       int maxSegs,
+                       int recursionLevel) {
+               this(
+                               longTable,
+                               partitionNum,
+                               buildSideSerializer,
+                               getBucketBuffersByRowCount((long) 
estimatedRowCount, maxSegs, longTable.pageSize()),
+                               recursionLevel,
+                               null,
+                               0);
+               this.buildSideWriteBuffer = new 
BuildSideBuffer(longTable.nextSegment());
+       }
+
+       /**
+        * Entrance 2: build table from spilled partition when the partition 
fits entirely into main
+        * memory.
+        */
+       LongHashPartition(
+                       LongHybridHashTable longTable,
+                       int partitionNum,
+                       BinaryRowSerializer buildSideSerializer,
+                       int bucketNumSegs,
+                       int recursionLevel,
+                       List<MemorySegment> buffers,
+                       int lastSegmentLimit) {
+               this(longTable, buildSideSerializer, listToArray(buffers));
+               this.partitionNum = partitionNum;
+               this.recursionLevel = recursionLevel;
+
+               int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * 
segmentSize / 16);
+               MemorySegment[] buckets = new MemorySegment[bucketNumSegs];
+               for (int i = 0; i < bucketNumSegs; i++) {
+                       buckets[i] = longTable.nextSegment();
+               }
+               setNewBuckets(buckets, numBuckets);
+               this.finalBufferLimit = lastSegmentLimit;
+       }
+
+       /**
+        * Entrance 3: dense mode for just data search (bucket in 
LongHybridHashTable of dense mode).
+        */
+       LongHashPartition(
+                       LongHybridHashTable longTable,
+                       BinaryRowSerializer buildSideSerializer,
+                       MemorySegment[] partitionBuffers) {
+               super(0);
+               this.longTable = longTable;
+               this.buildSideSerializer = buildSideSerializer;
+               this.buildReuseRow = buildSideSerializer.createInstance();
+               this.segmentSize = longTable.pageSize();
+               Preconditions.checkArgument(segmentSize % 16 == 0);
+               this.partitionBuffers = partitionBuffers;
+               this.segmentSizeBits = MathUtils.log2strict(segmentSize);
+               this.segmentSizeMask = segmentSize - 1;
+               this.finalBufferLimit = segmentSize;
+               this.iterator = new MatchIterator();
+       }
+
+       private static MemorySegment[] listToArray(List<MemorySegment> list) {
+               if (list != null) {
+                       return list.toArray(new MemorySegment[list.size()]);
+               }
+               return null;
+       }
+
+       private static int getBucketBuffersByRowCount(long rowCount, int 
maxSegs, int segmentSize) {
+               int minNumBuckets = (int) Math.ceil((rowCount / 0.5));
+               Preconditions.checkArgument(segmentSize % 16 == 0);
+               return MathUtils.roundDownToPowerOf2((int) Math.max(1,
+                               Math.min(maxSegs, Math.ceil(((double) 
minNumBuckets) * 16 / segmentSize))));
+       }
+
+       private void setNewBuckets(MemorySegment[] buckets, int numBuckets) {
+               for (MemorySegment segment : buckets) {
+                       for (int i = 0; i < segmentSize; i += 16) {
+                               // Maybe we don't need init key, cause always 
verify address
+                               segment.putLong(i, 0);
+                               segment.putLong(i + 8, INVALID_ADDRESS);
+                       }
+               }
+               this.buckets = buckets;
+               checkArgument(MathUtils.isPowerOf2(numBuckets));
+               this.numBuckets = numBuckets;
+               this.numBucketsMask = numBuckets - 1;
+               this.numKeys = 0;
+       }
+
+       static long toAddrAndLen(long address, int size) {
+               return (address << SIZE_BITS) | size;
+       }
+
+       static long toAddress(long addrAndLen) {
+               return addrAndLen >>> SIZE_BITS;
+       }
+
+       static int toLength(long addrAndLen) {
+               return (int) (addrAndLen & SIZE_MASK);
+       }
+
+       /**
+        * Returns an iterator of BinaryRow for multiple linked values.
+        */
+       MatchIterator valueIter(long address) {
+               iterator.set(address);
+               return iterator;
+       }
+
+//     public MatchIterator get(long key) {
+//             return get(key, hashLong(key, recursionLevel));
+//     }
+
+       /**
+        * Returns an iterator for all the values for the given key, or null if 
no value found.
+        */
+       public MatchIterator get(long key, int hashCode) {
+               int bucket = hashCode & numBucketsMask;
+
+               int bucketOffset = bucket << 4;
+               MemorySegment segment = buckets[bucketOffset >>> 
segmentSizeBits];
+               int segOffset = bucketOffset & segmentSizeMask;
+
+               while (true) {
+                       long address = segment.getLong(segOffset + 8);
+                       if (address != INVALID_ADDRESS) {
+                               if (segment.getLong(segOffset) == key) {
+                                       return valueIter(address);
+                               } else {
+                                       bucket = (bucket + 1) & numBucketsMask;
+                                       if (segOffset + 16 < segmentSize) {
+                                               segOffset += 16;
+                                       } else {
+                                               bucketOffset = bucket << 4;
+                                               segOffset = bucketOffset & 
segmentSizeMask;
+                                               segment = buckets[bucketOffset 
>>> segmentSizeBits];
+                                       }
+                               }
+                       } else {
+                               return valueIter(INVALID_ADDRESS);
+                       }
+               }
+       }
+
+       /**
+        * Update the address in array for given key.
+        */
+       private void updateIndex(
+                       long key,
+                       int hashCode,
+                       long address,
+                       int size,
+                       MemorySegment dataSegment,
+                       int currentPositionInSegment) throws IOException {
+               assert (numKeys <= numBuckets / 2);
+               int bucketId = hashCode & numBucketsMask;
+
+               // each bucket occupied 16 bytes (long key + long pointer to 
data address)
+               int bucketOffset = bucketId * 
SPARSE_BUCKET_ELEMENT_SIZE_IN_BYTES;
+               MemorySegment segment = buckets[bucketOffset >>> 
segmentSizeBits];
+               int segOffset = bucketOffset & segmentSizeMask;
+               long currAddress;
+
+               while (true) {
+                       currAddress = segment.getLong(segOffset + 8);
+                       if (segment.getLong(segOffset) != key && currAddress != 
INVALID_ADDRESS) {
+                               // hash conflicts, the bucket is occupied by 
another key
+
+                               // TODO test Conflict resolution:
+                               // now:    +1 +1 +1... cache friendly but more 
conflict, so we set factor to 0.5
+                               // other1: +1 +2 +3... less conflict, factor 
can be 0.75
+                               // other2: Secondary hashCode... less and less 
conflict, but need compute hash again
+                               bucketId = (bucketId + 1) & numBucketsMask;
+                               if (segOffset + 
SPARSE_BUCKET_ELEMENT_SIZE_IN_BYTES < segmentSize) {
+                                       // if the new bucket still in current 
segment, we only need to update offset
+                                       // within this segment
+                                       segOffset += 
SPARSE_BUCKET_ELEMENT_SIZE_IN_BYTES;
+                               } else {
+                                       // otherwise, we should re-calculate 
segment and offset
+                                       bucketOffset = bucketId * 16;
+                                       segment = buckets[bucketOffset >>> 
segmentSizeBits];
+                                       segOffset = bucketOffset & 
segmentSizeMask;
+                               }
+                       } else {
+                               break;
+                       }
+               }
+               if (currAddress == INVALID_ADDRESS) {
+                       // this is the first value for this key, put the 
address in array.
+                       segment.putLong(segOffset, key);
+                       segment.putLong(segOffset + 8, address);
+                       numKeys += 1;
+                       // dataSegment may be null if we only have to rehash 
bucket area
+                       if (dataSegment != null) {
+                               dataSegment.putLong(currentPositionInSegment, 
toAddrAndLen(INVALID_ADDRESS, size));
+                       }
+                       if (numKeys * 2 > numBuckets) {
+                               resize();
+                       }
+               } else {
+                       // there are some values for this key, put the address 
in the front of them.
+                       dataSegment.putLong(currentPositionInSegment, 
toAddrAndLen(currAddress, size));
+                       segment.putLong(segOffset + 8, address);
+               }
+       }
+
+       private void resize() throws IOException {
+               MemorySegment[] oldBuckets = this.buckets;
+               int oldNumBuckets = numBuckets;
+               int newNumSegs = oldBuckets.length * 2;
+               int newNumBuckets = MathUtils.roundDownToPowerOf2(newNumSegs * 
segmentSize / 16);
+
+               // request new buckets.
+               MemorySegment[] newBuckets = new MemorySegment[newNumSegs];
+               for (int i = 0; i < newNumSegs; i++) {
+                       MemorySegment seg = longTable.getNextBuffer();
+                       if (seg == null) {
+                               final int spilledPart = 
longTable.spillPartition();
+                               if (spilledPart == partitionNum) {
+                                       // this bucket is no longer in-memory
+                                       // free new segments.
+                                       
longTable.returnAll(Arrays.asList(newBuckets));
+                                       return;
+                               }
+                               seg = longTable.getNextBuffer();
+                               if (seg == null) {
+                                       throw new RuntimeException(
+                                                       "Bug in HybridHashJoin: 
No memory became available after spilling a partition.");
+                               }
+                       }
+                       newBuckets[i] = seg;
+               }
+
+               setNewBuckets(newBuckets, newNumBuckets);
+               reHash(oldBuckets, oldNumBuckets);
+       }
+
+       private void reHash(MemorySegment[] oldBuckets, int oldNumBuckets) 
throws IOException {
+               long reHashStartTime = System.currentTimeMillis();
+               int bucketOffset = 0;
+               MemorySegment segment = oldBuckets[bucketOffset];
+               int segOffset = 0;
+               for (int i = 0; i < oldNumBuckets; i++) {
+                       long address = segment.getLong(segOffset + 8);
+                       if (address != INVALID_ADDRESS) {
+                               long key = segment.getLong(segOffset);
+                               // size/dataSegment/currentPositionInSegment 
should never be used.
+                               updateIndex(key, hashLong(key, recursionLevel), 
address, 0, null, 0);
+                       }
+
+                       // not last bucket, move to next.
+                       if (i != oldNumBuckets - 1) {
+                               if (segOffset + 16 < segmentSize) {
+                                       segOffset += 16;
+                               } else {
+                                       segment = oldBuckets[++bucketOffset];
+                                       segOffset = 0;
+                               }
+                       }
+               }
+
+               longTable.returnAll(Arrays.asList(oldBuckets));
+               LOG.info("The rehash take {} ms for {} segments", 
(System.currentTimeMillis() - reHashStartTime), numBuckets);
+       }
+
+       public MemorySegment[] getBuckets() {
+               return buckets;
+       }
+
+       int getBuildSideBlockCount() {
+               return this.partitionBuffers == null ? 
this.buildSideWriteBuffer.getBlockCount()
+                               : this.partitionBuffers.length;
+       }
+
+       int getProbeSideBlockCount() {
+               return this.probeSideBuffer == null ? -1 : 
this.probeSideBuffer.getBlockCount();
+       }
+
+       BlockChannelWriter<MemorySegment> getBuildSideChannel() {
+               return this.buildSideChannel;
+       }
+
+       FileIOChannel.ID getProbeSideChannelID() {
+               return probeSideBuffer.getChannel().getChannelID();
+       }
+
+       int getPartitionNumber() {
+               return this.partitionNum;
+       }
+
+       MemorySegment[] getPartitionBuffers() {
+               return partitionBuffers;
+       }
+
+       int getRecursionLevel() {
+               return this.recursionLevel;
+       }
+
+       int getNumOccupiedMemorySegments() {
+               // either the number of memory segments, or one for spilling
+               final int numPartitionBuffers = this.partitionBuffers != null ?
+                               this.partitionBuffers.length
+                               : 
this.buildSideWriteBuffer.getNumOccupiedMemorySegments();
+               return numPartitionBuffers + buckets.length;
+       }
+
+       int spillPartition(IOManager ioAccess, FileIOChannel.ID targetChannel,
+                       LinkedBlockingQueue<MemorySegment> bufferReturnQueue) 
throws IOException {
+               // sanity checks
+               if (!isInMemory()) {
+                       throw new RuntimeException("Bug in Hybrid Hash Join: " +
+                                       "Request to spill a partition that has 
already been spilled.");
+               }
+               if (getNumOccupiedMemorySegments() < 2) {
+                       throw new RuntimeException("Bug in Hybrid Hash Join: " +
+                                       "Request to spill a partition with less 
than two buffers.");
+               }
+
+               // create the channel block writer and spill the current buffers
+               // that keep the build side buffers current block, as it is 
most likely not full, yet
+               // we return the number of blocks that become available
+               this.buildSideChannel = 
FileChannelUtil.createBlockChannelWriter(
+                               ioAccess,
+                               targetChannel,
+                               bufferReturnQueue,
+                               longTable.compressionEnable(),
+                               longTable.compressionCodecFactory(),
+                               longTable.compressionBlockSize(),
+                               segmentSize);
+               return this.buildSideWriteBuffer.spill(this.buildSideChannel);
+       }
+
+       /**
+        * After build phase.
+        *
+        * @return build spill return buffer, if have spilled, it returns the 
current write buffer,
+        * because it was used all the time in build phase, so it can only be 
returned at this time.
+        */
+       int finalizeBuildPhase(
+                       IOManager ioAccess,
+                       FileIOChannel.Enumerator probeChannelEnumerator) throws 
IOException {
+               this.finalBufferLimit = 
this.buildSideWriteBuffer.getCurrentPositionInSegment();
+               this.partitionBuffers = this.buildSideWriteBuffer.close();
+
+               if (!isInMemory()) {
+                       // close the channel.
+                       this.buildSideChannel.close();
+
+                       this.probeSideBuffer = FileChannelUtil.createOutputView(
+                                       ioAccess,
+                                       probeChannelEnumerator.next(),
+                                       longTable.compressionEnable(),
+                                       longTable.compressionCodecFactory(),
+                                       longTable.compressionBlockSize(),
+                                       segmentSize);
+                       return 1;
+               } else {
+                       return 0;
+               }
+       }
+
+       void finalizeProbePhase(List<LongHashPartition> spilledPartitions) 
throws IOException {
+               if (isInMemory()) {
+                       releaseBuckets();
+                       longTable.returnAll(Arrays.asList(partitionBuffers));
+                       this.partitionBuffers = null;
+               } else {
+                       if (this.probeSideRecordCounter == 0) {
+                               // delete the spill files
+                               this.probeSideBuffer.close();
+                               this.buildSideChannel.deleteChannel();
+                               
this.probeSideBuffer.getChannel().deleteChannel();
+                       } else {
+                               // flush the last probe side buffer and 
register this partition as pending
+                               probeNumBytesInLastSeg = 
this.probeSideBuffer.close();
+                               spilledPartitions.add(this);
+                       }
+               }
+       }
+
+       final PartitionIterator newPartitionIterator() {
+               return new PartitionIterator();
+       }
+
+       final int getLastSegmentLimit() {
+               return this.finalBufferLimit;
+       }
+
+       // ------------------ PagedInputView for read --------------------
+
+       @Override
+       public void setReadPosition(long pointer) {
+               final int bufferNum = (int) (pointer >>> this.segmentSizeBits);
+               final int offset = (int) (pointer & segmentSizeMask);
+
+               this.currentBufferNum = bufferNum;
+
+               seekInput(this.partitionBuffers[bufferNum], offset,
+                               bufferNum < partitionBuffers.length - 1 ? 
segmentSize : finalBufferLimit);
+       }
+
+       @Override
+       protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
+               this.currentBufferNum++;
+               if (this.currentBufferNum < this.partitionBuffers.length) {
+                       return this.partitionBuffers[this.currentBufferNum];
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       protected int getLimitForSegment(MemorySegment segment) {
+               return segment == partitionBuffers[partitionBuffers.length - 1] 
? finalBufferLimit : segmentSize;
+       }
+
+       boolean isInMemory() {
+               return buildSideChannel == null;
+       }
+
+       final void insertIntoProbeBuffer(
+                       BinaryRowSerializer probeSer,
+                       BinaryRow record) throws IOException {
+               probeSer.serialize(record, this.probeSideBuffer);
+               this.probeSideRecordCounter++;
+       }
+
+       long getBuildSideRecordCount() {
+               return buildSideRecordCounter;
+       }
+
+       long getMinKey() {
+               return minKey;
+       }
+
+       long getMaxKey() {
+               return maxKey;
+       }
+
+       private void updateMinMax(long key) {
+               if (key < minKey) {
+                       minKey = key;
+               }
+               if (key > maxKey) {
+                       maxKey = key;
+               }
+       }
+
+       void insertIntoBucket(long key, int hashCode, int size, long address) 
throws IOException {
+               this.buildSideRecordCounter++;
+               updateMinMax(key);
+
+               final int bufferNum = (int) (address >>> this.segmentSizeBits);
+               final int offset = (int) (address & (this.segmentSize - 1));
+               updateIndex(key, hashCode, address, size, 
partitionBuffers[bufferNum], offset);
+       }
+
+       void insertIntoTable(long key, int hashCode, BinaryRow row) throws 
IOException {
+               this.buildSideRecordCounter++;
+               updateMinMax(key);
+               int sizeInBytes = row.getSizeInBytes();
+               if (sizeInBytes >= (1 << SIZE_BITS)) {
+                       throw new UnsupportedOperationException("Does not 
support row that is larger than 256M");
+               }
+               if (isInMemory()) {
+                       checkWriteAdvance();
+                       // after advance, we may run out memory and spill this 
partition, check still in memory
+                       // again
+                       if (isInMemory()) {
+                               updateIndex(
+                                               key,
+                                               hashCode,
+                                               
buildSideWriteBuffer.getCurrentPointer(),
+                                               sizeInBytes,
+                                               
buildSideWriteBuffer.getCurrentSegment(),
+                                               
buildSideWriteBuffer.getCurrentPositionInSegment());
+                       } else {
+                               
buildSideWriteBuffer.getCurrentSegment().putLong(
+                                               
buildSideWriteBuffer.getCurrentPositionInSegment(),
+                                               toAddrAndLen(INVALID_ADDRESS, 
sizeInBytes));
+                       }
+
+                       buildSideWriteBuffer.skipBytesToWrite(8);
+                       if (row.getSegments().length == 1) {
+                               
buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
+                       } else {
+                               
buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
+                       }
+               } else {
+                       serializeToPages(row);
+               }
+       }
+
+       public void serializeToPages(BinaryRow row) throws IOException {
+
+               int sizeInBytes = row.getSizeInBytes();
+               checkWriteAdvance();
+
+               buildSideWriteBuffer.getCurrentSegment().putLong(
+                               
buildSideWriteBuffer.getCurrentPositionInSegment(),
+                               toAddrAndLen(INVALID_ADDRESS, 
row.getSizeInBytes()));
+               buildSideWriteBuffer.skipBytesToWrite(8);
+
+               if (row.getSegments().length == 1) {
+                       buildSideWriteBuffer.write(row.getSegments()[0], 
row.getOffset(), sizeInBytes);
+               } else {
+                       buildSideSerializer.serializeToPagesWithoutLength(row, 
buildSideWriteBuffer);
+               }
+       }
+
+       void releaseBuckets() {
+               if (buckets != null) {
+                       longTable.returnAll(Arrays.asList(buckets));
+                       buckets = null;
+               }
+       }
+
+//     public void append(long key, BinaryRow row) throws IOException {
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to