Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1107#discussion_r167283942 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.record; + +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; +import org.apache.drill.exec.vector.ValueVector; + +public abstract class AbstractRecordBatchMemoryManager { + private int outgoingRowWidth; + private int outputRowCount = ValueVector.MAX_ROW_COUNT; + protected static final int OFFSET_VECTOR_WIDTH = 4; + protected static final int WORST_CASE_FRAGMENTATION_FACTOR = 2; + protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; + protected static final int MIN_NUM_ROWS = 1; + + public void update(int inputIndex) {}; + + public void update() {}; + + public int getOutputRowCount() { + return outputRowCount; + } + + /** + * Given batchSize and rowWidth, this will set output rowCount taking into account + * the min and max that is allowed. + */ + public void setOutputRowCount(long batchSize, int rowWidth) { + this.outputRowCount = Math.min(ValueVector.MAX_ROW_COUNT, + Math.max(RecordBatchSizer.safeDivide(batchSize/WORST_CASE_FRAGMENTATION_FACTOR, rowWidth), MIN_NUM_ROWS)); --- End diff -- I suspect the math may be off here. `batchSize` is the actual memory for the batch? If so, then it already includes actual internal fragmentation which, depending on the source, can be 25% (for "good" operators) to 50% (if from the network) to 95% (pathological Parquet cases.) Using the fragmentation factor on this value gets us even more lost. A better way is to do: ``` rawBatchSize = rowWidth * rowCount; actualBatchSize = rowBatchSize * FRAG_FACTOR; ``` Or to compute the output size: ``` targetOutputSize = // some value you set targetDataSize = targetOutputSize / FRAG_FACTOR targetRowCount = targetDataSize / rowWidth ```
---