sachouche commented on a change in pull request #1330: DRILL-6147: Adding 
Columnar Parquet Batch Sizing functionality
URL: https://github.com/apache/drill/pull/1330#discussion_r199036789
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
 ##########
 @@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
+import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * This class is tasked with managing all aspects of flat Parquet reader 
record batch sizing logic.
+ * Currently a record batch size is constrained with two parameters: Number of 
rows and Memory usage.
+ */
+public final class RecordBatchSizerManager {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
+  public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
+  /** Minimum column memory size */
+  private static final int MIN_COLUMN_MEMORY_SZ = 
VarLenColumnBulkInput.getMinVLColumnMemorySize();
+  /** Default memory per batch */
+  private static final int DEFAULT_MEMORY_SZ_PER_BATCH = 16 * 1024 * 1024;
+  /** Default records per batch */
+  private static final int DEFAULT_RECORDS_PER_BATCH = 32 * 1024 -1;
+
+  /** Parquet schema object */
+  private final ParquetSchema schema;
+  /** Total records to read */
+  private final long totalRecordsToRead;
+  /** Logic to minimize overflow occurrences */
+  private final BatchOverflowOptimizer overflowOptimizer;
+
+  /** Configured Parquet records per batch */
+  private final int configRecordsPerBatch;
+  /** Configured Parquet memory size per batch */
+  private final int configMemorySizePerBatch;
+  /** An upper bound on the Parquet records per batch based on the configured 
value and schema */
+  private int maxRecordsPerBatch;
+  /** An upper bound on the Parquet memory size per batch based on the 
configured value and schema  */
+  private int maxMemorySizePerBatch;
+  /** The current number of records per batch as it can be dynamically 
optimized */
+  private int recordsPerBatch;
+
+  /** List of fixed columns */
+  private final List<ColumnMemoryInfo> fixedLengthColumns = new 
ArrayList<ColumnMemoryInfo>();
+  /** List of variable columns */
+  private final List<ColumnMemoryInfo> variableLengthColumns = new 
ArrayList<ColumnMemoryInfo>();
+  /** Field to column memory information map */
+  private final Map<String, ColumnMemoryInfo> columnMemoryInfoMap = 
CaseInsensitiveMap.newHashMap();
+  /** Indicator invoked when column(s) precision change */
+  private boolean columnPrecisionChanged;
+
+  /**
+   * Field overflow map; this information is stored within this class for two 
reasons:
+   * a) centralization to simplify resource deallocation (overflow data is 
backed by Direct Memory)
+   * b) overflow is a result of batch constraints enforcement which this class 
manages the overflow logic
+   */
+  private Map<String, FieldOverflowStateContainer> fieldOverflowMap = 
CaseInsensitiveMap.newHashMap();
+
+  /**
+   * Constructor.
+   *
+   * @param options drill options
+   * @param schema current reader schema
+   * @param totalRecordsToRead total number of rows to read
+   */
+  public RecordBatchSizerManager(OptionManager options,
+    ParquetSchema schema,
+    long totalRecordsToRead) {
+
+    this.schema = schema;
+    this.totalRecordsToRead = totalRecordsToRead;
+    this.configRecordsPerBatch = (int) 
options.getLong(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS);
+    this.configMemorySizePerBatch = getConfiguredMaxBatchMemory(options);
+    this.maxMemorySizePerBatch = this.configMemorySizePerBatch;
+    this.maxRecordsPerBatch = this.configRecordsPerBatch;
+    this.recordsPerBatch = this.configRecordsPerBatch;
+    this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
+  }
+
+  /**
+   * Tunes record batch parameters based on configuration and schema.
+   */
+  public void setup() {
+
+    // Normalize batch parameters
+    this.maxMemorySizePerBatch = normalizeMemorySizePerBatch();
+    this.maxRecordsPerBatch = normalizeNumRecordsPerBatch();
+
+    // Let's load the column metadata
+    loadColumnsPrecisionInfo();
+
+    if (getNumColumns() == 0) {
+      return; // there are cases where downstream operators don't select any 
columns
+              // in such a case, Parquet will return the pseudo column 
_DEFAULT_COL_TO_READ_
+    }
+
+    // We need to divide the overall memory pool amongst all columns
+    assignColumnsBatchMemory();
+
+    // Initialize the overflow optimizer
+    overflowOptimizer.setup();
+  }
+
+  /**
+   * @return the schema
+   */
+  public ParquetSchema getSchema() {
+    return schema;
+  }
+
+  /**
+   * Allocates value vectors for the current batch.
+   *
+   * @param vectorMap a collection of value vectors keyed by their field names
+   * @throws OutOfMemoryException
+   */
+  public void allocate(Map<String, ValueVector> vectorMap) throws 
OutOfMemoryException {
+
+    if (columnPrecisionChanged) {
+      // We need to divide the overall memory pool amongst all columns
+      assignColumnsBatchMemory();
+    }
+
+    try {
+      for (final ValueVector v : vectorMap.values()) {
+        ColumnMemoryInfo columnMemoryInfo = 
columnMemoryInfoMap.get(v.getField().getName());
+
+        if (columnMemoryInfo != null) {
+          AllocationHelper.allocate(v, recordsPerBatch, 
columnMemoryInfo.columnPrecision, 0);
+        } else {
+          // This column was found in another Parquet file but not the current 
one; so we inject
+          // a null value. At this time, we do not account for such columns. 
Why? the right design is
+          // to create a ZERO byte all-nulls value vector to handle such 
columns (there could be hundred of these).
+          AllocationHelper.allocate(v, recordsPerBatch, 0, 0); // the helper 
will still use a precision of 1
+        }
+      }
+    } catch (NullPointerException e) {
+      throw new OutOfMemoryException();
 
 Review comment:
   This code is the original code that I have moved from the Schema class into 
this one. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to