sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198942743
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; + +import io.netty.buffer.DrillBuf; +import java.util.List; +import java.util.Map; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition; +import org.apache.drill.exec.vector.UInt1Vector; +import org.apache.drill.exec.vector.UInt4Vector; + +/** + * Field overflow SERDE utility; note that overflow data is serialized as a way to minimize + * memory usage. This information is deserialized back to ValueVectors when it is needed in + * the next batch. + * + * <p><b>NOTE -</b>We use a specialized implementation for overflow SERDE (instead of reusing + * existing ones) because of the following reasons: + * <ul> + * <li>We want to only serialize a subset of the VV data + * <li>Other SERDE methods will not copy the data contiguously and instead rely on the + * RPC layer to write the drill buffers in the correct order so that they are + * de-serialized as a single contiguous drill buffer + * </ul> + */ +final class OverflowSerDeUtil { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class); + + /** + * Serializes a collection of overflow fields into a memory buffer: + * <ul> + * <li>Serialization logic can handle a subset of values (should be contiguous) + * <li>Serialized data is copied into a single DrillBuf + * <li>Currently, only variable length data is supported + * </ul> + * + * @param fieldOverflowEntries input collection of field overflow entries + * @param allocator buffer allocator + * @return record overflow container; null if the input buffer is empty + */ + static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries, + BufferAllocator allocator) { + + if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) { + return null; + } + + // We need to: + // - Construct a map of VLVectorSerDe for each overflow field + // - Compute the total space required for efficient serialization of all overflow data + final Map<String, VLVectorSerializer> fieldSerDeMap = CaseInsensitiveMap.newHashMap(); + int bufferLength = 0; + + for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) { + final VLVectorSerializer fieldVLSerDe = new VLVectorSerializer(fieldOverflowEntry); + fieldSerDeMap.put(fieldOverflowEntry.vector.getField().getName(), fieldVLSerDe); + + bufferLength += fieldVLSerDe.getBytesUsed(fieldOverflowEntry.firstValueIdx, fieldOverflowEntry.numValues); + } + assert bufferLength >= 0; + + // Allocate the required memory to serialize the overflow fields + final DrillBuf buffer = allocator.buffer(bufferLength); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength)); + } + + // Create the result object + final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer(); + final RecordOverflowDefinition recordOverflowDef = recordOverflowContainer.recordOverflowDef; + + // Now serialize field overflow into the drill buffer + int bufferOffset = 0; + FieldSerializerContainer fieldSerializerContainer = new FieldSerializerContainer(); + + for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) { + fieldSerializerContainer.clear(); + + // Serialize the field overflow data into the buffer + VLVectorSerializer fieldSerDe = fieldSerDeMap.get(fieldOverflowEntry.vector.getField().getName()); + assert fieldSerDe != null; + + fieldSerDe.copyValueVector(fieldOverflowEntry.firstValueIdx, + fieldOverflowEntry.numValues, + buffer, + bufferOffset, + fieldSerializerContainer); + + // Create a view DrillBuf for isolating this field overflow data + DrillBuf fieldBuffer = buffer.slice(bufferOffset, fieldSerializerContainer.totalByteLength); + fieldBuffer.retain(1); // Increase the reference count + fieldBuffer.writerIndex(fieldSerializerContainer.totalByteLength); + + // Enqueue a field overflow definition object for the current field + FieldOverflowDefinition fieldOverflowDef = new FieldOverflowDefinition( + fieldOverflowEntry.vector.getField(), + fieldOverflowEntry.numValues, + fieldSerializerContainer.dataByteLen, + fieldBuffer); + + recordOverflowDef.getFieldOverflowDefs().put(fieldOverflowEntry.vector.getField().getName(), fieldOverflowDef); + + // Update this drill buffer current offset + bufferOffset += fieldSerializerContainer.totalByteLength; + } + + // Finally, release the original buffer + boolean isReleased = buffer.release(); + assert !isReleased; // the reference count should still be higher than zero + + return recordOverflowContainer; + } + + /** Disabling object instantiation */ + private OverflowSerDeUtil() { + // NOOP + } + +// ---------------------------------------------------------------------------- +// Internal Data Structure +// ---------------------------------------------------------------------------- + + /** Container class to store the result of field overflow serialization */ + private static final class FieldSerializerContainer { + /** Data byte length */ + int dataByteLen; + /** Total byte length */ + int totalByteLength; + + void clear() { + dataByteLen = 0; + totalByteLength = 0; + } + } + + /** + * Helper class for handling variable length {@link ValueVector} overflow serialization logic + */ + private static final class VLVectorSerializer { + private static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH; + private static final int INT_VALUE_WIDTH = UInt4Vector.VALUE_WIDTH; + + /** Field overflow entry */ + private final FieldOverflowEntry fieldOverflowEntry; + + /** Set of DrillBuf's that make up the underlying ValueVector. Only nullable + * (VarChar or VarBinary) vectors have three entries. The format is + * ["bits-vector"] "offsets-vector" "values-vector" + */ + private final DrillBuf[] buffers; + + /** + * Constructor. + * @param fieldOverflowEntry field overflow entry + */ + private VLVectorSerializer(FieldOverflowEntry fieldOverflowEntry) { + this.fieldOverflowEntry = fieldOverflowEntry; + this.buffers = this.fieldOverflowEntry.vector.getBuffers(false); + } + + /** + * The number of bytes used (by the {@link ValueVector}) to store a value range delimited + * by the parameters "firstValueIdx" and "numValues". + * + * @param firstValueIdx start index of the first value to copy + * @param numValues number of values to copy + */ + private int getBytesUsed(int firstValueIdx, int numValues) { Review comment: We need to improve the current VV API as I need a partial segment of the VV. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services