This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 71c6c68 DRILL-6566: Reduce Hash Agg Batch size and estimate when low
available memory (#1438)
71c6c68 is described below
commit 71c6c689a083e7496f06e99b4d253f11866ee741
Author: Boaz Ben-Zvi <[email protected]>
AuthorDate: Wed Aug 22 11:28:06 2018 -0700
DRILL-6566: Reduce Hash Agg Batch size and estimate when low available
memory (#1438)
DRILL-6566: Reduce Hash Agg Batch size and estimate when mem available is
low
---
.../exec/physical/impl/aggregate/HashAggBatch.java | 18 ++++++++++++++++++
.../exec/physical/impl/aggregate/HashAggTemplate.java | 16 +++++++++++-----
.../drill/exec/physical/impl/join/HashJoinBatch.java | 2 +-
.../impl/join/HashJoinHelperSizeCalculatorImpl.java | 2 +-
.../impl/join/HashJoinMemoryCalculatorImpl.java | 4 ++--
.../impl/validate/IteratorValidatorBatchIterator.java | 4 ++--
.../exec/physical/impl/xsort/ExternalSortBatch.java | 2 +-
.../java/org/apache/drill/exec/record/RecordBatch.java | 4 ++--
.../impl/join/TestBuildSidePartitioningImpl.java | 16 ++++++++--------
.../join/TestHashJoinHelperSizeCalculatorImpl.java | 2 +-
.../physical/impl/join/TestLateralJoinCorrectness.java | 4 ++--
11 files changed, 49 insertions(+), 25 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index ba928ae..72837a8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -48,6 +48,7 @@ import
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -185,6 +186,23 @@ public class HashAggBatch extends
AbstractRecordBatch<HashAggregate> {
// get the output batch size from config.
int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+
+ // If needed - reduce the size to allow enough batches in the available
memory
+ long memAvail = oContext.getAllocator().getLimit();
+ long minBatchesPerPartition =
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
+ long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover
overheads, etc.
+ boolean is2ndPhase = popConfig.getAggPhase() ==
AggPrelBase.OperatorPhase.PHASE_2of2;
+ boolean fallbackEnabled =
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
+ if ( is2ndPhase && !fallbackEnabled ) {
+ minBatchesNeeded *= 2; // 2nd phase (w/o fallback) needs at least 2
partitions
+ }
+ if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast -
memAvail may be bigger than max-int
+ int reducedBatchSize = (int)(memAvail / minBatchesNeeded);
+ logger.trace("Reducing configured batch size from: {} to: {}, due to Mem
limit: {}",
+ configuredBatchSize, reducedBatchSize, memAvail);
+ configuredBatchSize = reducedBatchSize;
+ }
+
hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
logger.debug("BATCH_STATS, configured output batch size: {}",
configuredBatchSize);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1954c79..65ca829 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -80,7 +80,7 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
-import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
public abstract class HashAggTemplate implements HashAggregator {
protected static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
@@ -562,9 +562,15 @@ public abstract class HashAggTemplate implements
HashAggregator {
else { estValuesRowWidth += fieldSize; }
}
// multiply by the max number of rows in a batch to get the final
estimated max size
- estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
+ long estimatedMaxWidth = Math.max(estRowWidth, estInputRowWidth);
+ estMaxBatchSize = estimatedMaxWidth * MAX_BATCH_ROW_COUNT;
+ // estimated batch size should not exceed the configuration given size
+ int configuredBatchSize =
outgoing.getRecordBatchMemoryManager().getOutputBatchSize();
+ estMaxBatchSize = Math.min(estMaxBatchSize, configuredBatchSize);
+ // work back the number of rows (may have been reduced from
MAX_BATCH_ROW_COUNT)
+ long rowsInBatch = estMaxBatchSize / estimatedMaxWidth;
// (When there are no aggr functions, use '1' as later code relies on this
size being non-zero)
- estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
+ estValuesBatchSize = Math.max(estValuesRowWidth, 1) * rowsInBatch;
estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
logger.trace("{} phase. Estimated internal row width: {} Values row width:
{} batch size: {} memory limit: {} max column width: {}",
@@ -1490,13 +1496,13 @@ public abstract class HashAggTemplate implements
HashAggregator {
long maxMemoryNeeded = 0;
if ( !forceSpill ) { // need to check the memory in order to decide
// calculate the (max) new memory needed now; plan ahead for at least
MIN batches
- maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) *
(estMaxBatchSize + MAX_BATCH_SIZE * (4 + 4 /* links + hash-values */));
+ maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) *
(estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
// Add the (max) size of the current hash table, in case it will double
int maxSize = 1;
for (int insp = 0; insp < numPartitions; insp++) {
maxSize = Math.max(maxSize, batchHolders[insp].size());
}
- maxMemoryNeeded += MAX_BATCH_SIZE * 2 * 2 * 4 * maxSize; // 2 - double,
2 - max when %50 full, 4 - Uint4
+ maxMemoryNeeded += MAX_BATCH_ROW_COUNT * 2 * 2 * 4 * maxSize; // 2 -
double, 2 - max when %50 full, 4 - Uint4
// log a detailed debug message explaining why a spill may be needed
logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to
add to partition {} with {} batches. " + "Max memory needed {}, Est batch size
{}, mem limit {}",
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0bd6fe6..477e30c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -737,7 +737,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> {
{
// Initializing build calculator
// Limit scope of these variables to this block
- int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_SIZE:
RECORDS_PER_BATCH;
+ int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT :
RECORDS_PER_BATCH;
boolean hasProbeData = leftUpstream != IterOutcome.NONE;
boolean doMemoryCalculation = canSpill && hasProbeData;
HashJoinMemoryCalculator calc = getCalculatorImpl();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
index a17ea2f..f804277 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
@@ -34,7 +34,7 @@ public class HashJoinHelperSizeCalculatorImpl implements
HashJoinHelperSizeCalcu
Preconditions.checkArgument(!partitionStat.isSpilled());
// Account for the size of the SV4 in a hash join helper
- long joinHelperSize = IntVector.VALUE_WIDTH * RecordBatch.MAX_BATCH_SIZE;
+ long joinHelperSize = IntVector.VALUE_WIDTH *
RecordBatch.MAX_BATCH_ROW_COUNT;
// Account for the SV4 for each batch that holds links for each batch
for (HashJoinMemoryCalculator.BatchStat batchStat:
partitionStat.getInMemoryBatches()) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index a351cbc..2ab42e5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -66,9 +66,9 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
final HashTableSizeCalculator hashTableSizeCalculator;
if
(hashTableCalculatorType.equals(HashTableSizeCalculatorLeanImpl.TYPE)) {
- hashTableSizeCalculator = new
HashTableSizeCalculatorLeanImpl(RecordBatch.MAX_BATCH_SIZE,
hashTableDoublingFactor);
+ hashTableSizeCalculator = new
HashTableSizeCalculatorLeanImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
hashTableDoublingFactor);
} else if
(hashTableCalculatorType.equals(HashTableSizeCalculatorConservativeImpl.TYPE)) {
- hashTableSizeCalculator = new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
hashTableDoublingFactor);
+ hashTableSizeCalculator = new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
hashTableDoublingFactor);
} else {
throw new IllegalArgumentException("Invalid calc type: " +
hashTableCalculatorType);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 378c980..88f4c7d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -307,12 +307,12 @@ public class IteratorValidatorBatchIterator implements
CloseableRecordBatch {
}
// It's legal for a batch to have zero field. For instance, a
relational table could have
// zero columns. Querying such table requires execution operator to
process batch with 0 field.
- if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
+ if (incoming.getRecordCount() > MAX_BATCH_ROW_COUNT) {
throw new IllegalStateException(
String.format(
"Incoming batch [#%d, %s] has size %d, which is beyond the"
+ " limit of %d",
- instNum, batchTypeName, incoming.getRecordCount(),
MAX_BATCH_SIZE
+ instNum, batchTypeName, incoming.getRecordCount(),
MAX_BATCH_ROW_COUNT
));
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0f3f8a3..cf7763e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -512,7 +512,7 @@ public class ExternalSortBatch extends
AbstractRecordBatch<ExternalSort> {
estimatedRecordSize += 50;
}
}
- targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1,
COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
+ targetRecordCount = Math.min(MAX_BATCH_ROW_COUNT, Math.max(1,
COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
int count = copier.next(targetRecordCount);
container.buildSchema(SelectionVectorMode.NONE);
container.setRecordCount(count);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index f0cab26..b44a362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -38,8 +38,8 @@ import org.apache.drill.exec.vector.ValueVector;
*/
public interface RecordBatch extends VectorAccessible {
- /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */
- int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT;
+ /** max num of rows in a batch, limited by 2-byte length in SV2: 65536 =
2^16 */
+ int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
/**
* Describes the outcome of incrementing RecordBatch forward by a call to
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index ceebc81..af943ec 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -33,7 +33,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -78,7 +78,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -123,7 +123,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -171,7 +171,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -206,7 +206,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -252,7 +252,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -298,7 +298,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
@@ -349,7 +349,7 @@ public class TestBuildSidePartitioningImpl {
final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
- new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
safetyFactor);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
index ed25d78..b9ae58d 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -30,7 +30,7 @@ public class TestHashJoinHelperSizeCalculatorImpl {
((long)
TypeHelper.getSize(TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT).build()));
// Account for the overhead of a selection vector
- long expected = intSize * RecordBatch.MAX_BATCH_SIZE;
+ long expected = intSize * RecordBatch.MAX_BATCH_ROW_COUNT;
// Account for sv4 vector for batches
expected += intSize * 3500;
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 6626176..36c9eec 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -1275,7 +1275,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
* Temporary test to validate LATERAL handling output batch getting filled
without consuming full output from left
* and right batch join.
* <p>
- * For this test we are updating {@link LateralJoinBatch#MAX_BATCH_SIZE} by
making it public, which might not expected
+ * For this test we are updating {@link
LateralJoinBatch#MAX_BATCH_ROW_COUNT} by making it public, which might not
expected
* after including the BatchSizing logic
* TODO: Update the test after incorporating the BatchSizing change.
*
@@ -1943,7 +1943,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
* This test generates an operator tree for multi level LATERAL by stacking
2 LATERAL and finally an UNNEST pair
* (using MockRecord Batch) as left and right child of lower level LATERAL.
Then we call next() on top level
* LATERAL to simulate the operator tree and compare the outcome and record
count generated with expected values.
- * This test also changes the MAX_BATCH_SIZE to simulate the output being
produced in multiple batches.
+ * This test also changes the MAX_BATCH_ROW_COUNT to simulate the output
being produced in multiple batches.
* @throws Exception
*/
@Test