sachouche commented on a change in pull request #1330: DRILL-6147: Adding 
Columnar Parquet Batch Sizing functionality
URL: https://github.com/apache/drill/pull/1330#discussion_r198942743
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
 ##########
 @@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import io.netty.buffer.DrillBuf;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.memory.BufferAllocator;
+import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
+import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
+import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
+
+/**
+ * Field overflow SERDE utility; note that overflow data is serialized as a 
way to minimize
+ * memory usage. This information is deserialized back to ValueVectors when it 
is needed in
+ * the next batch.
+ *
+ * <p><b>NOTE -</b>We use a specialized implementation for overflow SERDE 
(instead of reusing
+ * existing ones) because of the following reasons:
+ * <ul>
+ * <li>We want to only serialize a subset of the VV data
+ * <li>Other SERDE methods will not copy the data contiguously and instead 
rely on the
+ *     RPC layer to write the drill buffers in the correct order so that they 
are
+ *     de-serialized as a single contiguous drill buffer
+ * </ul>
+ */
+final class OverflowSerDeUtil {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);
+
+  /**
+   * Serializes a collection of overflow fields into a memory buffer:
+   * <ul>
+   * <li>Serialization logic can handle a subset of values (should be 
contiguous)
+   * <li>Serialized data is copied into a single DrillBuf
+   * <li>Currently, only variable length data is supported
+   * </ul>
+   *
+   * @param fieldOverflowEntries input collection of field overflow entries
+   * @param allocator buffer allocator
+   * @return record overflow container; null if the input buffer is empty
+   */
+  static RecordOverflowContainer serialize(List<FieldOverflowEntry> 
fieldOverflowEntries,
+    BufferAllocator allocator) {
+
+    if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
+      return null;
+    }
+
+    // We need to:
+    // - Construct a map of VLVectorSerDe for each overflow field
+    // - Compute the total space required for efficient serialization of all 
overflow data
+    final Map<String, VLVectorSerializer> fieldSerDeMap = 
CaseInsensitiveMap.newHashMap();
+    int bufferLength = 0;
+
+    for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) {
+      final VLVectorSerializer fieldVLSerDe = new 
VLVectorSerializer(fieldOverflowEntry);
+      fieldSerDeMap.put(fieldOverflowEntry.vector.getField().getName(), 
fieldVLSerDe);
+
+      bufferLength += 
fieldVLSerDe.getBytesUsed(fieldOverflowEntry.firstValueIdx, 
fieldOverflowEntry.numValues);
+    }
+    assert bufferLength >= 0;
+
+    // Allocate the required memory to serialize the overflow fields
+    final DrillBuf buffer = allocator.buffer(bufferLength);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(String.format("Allocated a buffer of length %d to handle 
overflow", bufferLength));
+    }
+
+    // Create the result object
+    final RecordOverflowContainer recordOverflowContainer = new 
RecordOverflowContainer();
+    final RecordOverflowDefinition recordOverflowDef = 
recordOverflowContainer.recordOverflowDef;
+
+    // Now serialize field overflow into the drill buffer
+    int bufferOffset = 0;
+    FieldSerializerContainer fieldSerializerContainer = new 
FieldSerializerContainer();
+
+    for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) {
+      fieldSerializerContainer.clear();
+
+      // Serialize the field overflow data into the buffer
+      VLVectorSerializer fieldSerDe = 
fieldSerDeMap.get(fieldOverflowEntry.vector.getField().getName());
+      assert fieldSerDe != null;
+
+      fieldSerDe.copyValueVector(fieldOverflowEntry.firstValueIdx,
+        fieldOverflowEntry.numValues,
+        buffer,
+        bufferOffset,
+        fieldSerializerContainer);
+
+      // Create a view DrillBuf for isolating this field overflow data
+      DrillBuf fieldBuffer = buffer.slice(bufferOffset, 
fieldSerializerContainer.totalByteLength);
+      fieldBuffer.retain(1); // Increase the reference count
+      fieldBuffer.writerIndex(fieldSerializerContainer.totalByteLength);
+
+      // Enqueue a field overflow definition object for the current field
+      FieldOverflowDefinition fieldOverflowDef = new FieldOverflowDefinition(
+        fieldOverflowEntry.vector.getField(),
+        fieldOverflowEntry.numValues,
+        fieldSerializerContainer.dataByteLen,
+        fieldBuffer);
+
+      
recordOverflowDef.getFieldOverflowDefs().put(fieldOverflowEntry.vector.getField().getName(),
 fieldOverflowDef);
+
+      // Update this drill buffer current offset
+      bufferOffset += fieldSerializerContainer.totalByteLength;
+    }
+
+    // Finally, release the original buffer
+    boolean isReleased = buffer.release();
+    assert !isReleased; // the reference count should still be higher than zero
+
+    return recordOverflowContainer;
+  }
+
+  /** Disabling object instantiation */
+  private  OverflowSerDeUtil() {
+    // NOOP
+  }
+
+// ----------------------------------------------------------------------------
+// Internal Data Structure
+// ----------------------------------------------------------------------------
+
+  /** Container class to store the result of field overflow serialization */
+  private static final class FieldSerializerContainer {
+    /** Data byte length */
+    int dataByteLen;
+    /** Total byte length */
+    int totalByteLength;
+
+    void clear() {
+      dataByteLen = 0;
+      totalByteLength = 0;
+    }
+  }
+
+  /**
+   * Helper class for handling variable length {@link ValueVector} overflow 
serialization logic
+   */
+  private static final class VLVectorSerializer {
+    private static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH;
+    private static final int INT_VALUE_WIDTH  = UInt4Vector.VALUE_WIDTH;
+
+    /** Field overflow entry */
+    private final FieldOverflowEntry fieldOverflowEntry;
+
+    /** Set of DrillBuf's that make up the underlying ValueVector. Only 
nullable
+     * (VarChar or VarBinary) vectors have three entries. The format is
+     * ["bits-vector"] "offsets-vector" "values-vector"
+     */
+    private final DrillBuf[] buffers;
+
+    /**
+     * Constructor.
+     * @param fieldOverflowEntry field overflow entry
+     */
+    private VLVectorSerializer(FieldOverflowEntry fieldOverflowEntry) {
+      this.fieldOverflowEntry = fieldOverflowEntry;
+      this.buffers = this.fieldOverflowEntry.vector.getBuffers(false);
+    }
+
+    /**
+     * The number of bytes used (by the {@link ValueVector}) to store a value 
range delimited
+     * by the parameters "firstValueIdx" and "numValues".
+     *
+     * @param firstValueIdx start index of the first value to copy
+     * @param numValues number of values to copy
+     */
+    private int getBytesUsed(int firstValueIdx, int numValues) {
 
 Review comment:
   We need to improve the current VV API as I need a partial segment of the VV.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to