Ben-Zvi closed pull request #1480: DRILL-6755: Avoid building Hash Table for
inner/left join when probe side is empty
URL: https://github.com/apache/drill/pull/1480
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5dc91b..21dcdcf83e3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@
* processed individually (that Build partition should be smaller than the
original, hence likely fit whole into
* memory to allow probing; if not -- see below).
* Processing of each spilled pair is EXACTLY like processing the
original Build/Probe incomings. (As a fact,
- * the {@Link #innerNext() innerNext} method calls itself recursively !!).
Thus the spilled build partition is
+ * the {@link #innerNext()} method calls itself recursively !!). Thus the
spilled build partition is
* read and divided into new partitions, which in turn may spill again (and
again...).
* The code tracks these spilling "cycles". Normally any such "again" (i.e.
cycle of 2 or greater) is a waste,
* indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
+ private boolean joinIsLeftOrFull;
+ private boolean joinIsRightOrFull;
+ private boolean skipHashTableBuild; // when outer side is empty, and the
join is inner or left (see DRILL-6755)
// Join conditions
private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@
private final Set<String> buildJoinColumns;
// Fields used for partitioning
-
- private long maxIncomingBatchSize;
/**
* The number of {@link HashPartition}s. This is configured via a system
option and set in {@link #partitionNumTuning(int,
HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
@@ -264,6 +264,8 @@ protected void buildSchema() throws SchemaChangeException {
buildSchema = right.getSchema();
// position of the new "column" for keeping the hash values (after the
real columns)
rightHVColPosition = right.getContainer().getNumberOfColumns();
+ // In special cases, when the probe side is empty, and inner/left join
- no need for Hash Table
+ skipHashTableBuild = leftUpstream == IterOutcome.NONE && !
joinIsRightOrFull;
// We only need the hash tables if we have data on the build side.
setupHashTable();
}
@@ -447,12 +449,12 @@ public IterOutcome innerNext() {
// Try to probe and project, or recursively handle a spilled partition
if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
- joinType != JoinRelType.INNER) { // or if this is a left/full outer
join
+ joinIsLeftOrFull) { // or if this is a left/full outer join
prefetchFirstProbeBatch();
if (leftUpstream.isError() ||
- ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType
!= JoinRelType.RIGHT )) {
+ ( leftUpstream == NONE && ! joinIsRightOrFull )) {
// A termination condition was reached while prefetching the first
probe side data holding batch.
// We need to terminate.
return leftUpstream;
@@ -568,19 +570,7 @@ public IterOutcome innerNext() {
} else {
// Our build side is empty, we won't have any matches, clear the probe
side
- if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream ==
IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
- probeBatch.kill(true);
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
- while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream ==
IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
- }
- }
+ killAndDrainLeftUpstream();
}
// No more output records, clean up and return
@@ -596,10 +586,31 @@ public IterOutcome innerNext() {
}
}
+ /**
+ * In case an upstream data is no longer needed, send a kill and flush any
remaining batch
+ *
+ * @param batch probe or build batch
+ * @param upstream which upstream
+ * @param isLeft is it the left or right
+ */
+ private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream,
boolean isLeft) {
+ batch.kill(true);
+ while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream ==
IterOutcome.OK) {
+ for (final VectorWrapper<?> wrapper : batch) {
+ wrapper.getValueVector().clear();
+ }
+ upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT :
HashJoinHelper.RIGHT_INPUT, batch);
+ }
+ }
+ private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch,
leftUpstream, true); }
+ private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch,
rightUpstream, false); }
+
private void setupHashTable() throws SchemaChangeException {
final List<Comparator> comparators =
Lists.newArrayListWithExpectedSize(conditions.size());
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
+ if ( skipHashTableBuild ) { return; }
+
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -819,6 +830,11 @@ public IterOutcome executeBuildPhase() throws
SchemaChangeException {
return null;
}
+ if ( skipHashTableBuild ) { // No hash table needed - then consume all the
right upstream
+ killAndDrainRightUpstream();
+ return null;
+ }
+
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
boolean firstCycle = cycleNum == 0;
@@ -1013,7 +1029,7 @@ private void setupOutputContainerSchema() {
final MajorType outputType;
// If left or full outer join, then the output type must be nullable.
However, map types are
// not nullable so we must exclude them from the check below (see
DRILL-2197).
- if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) &&
inputType.getMode() == DataMode.REQUIRED
+ if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
@@ -1034,7 +1050,7 @@ private void setupOutputContainerSchema() {
// If right or full outer join then the output type should be
optional. However, map types are
// not nullable so we must exclude them from the check below (see
DRILL-2771, DRILL-2197).
- if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) &&
inputType.getMode() == DataMode.REQUIRED
+ if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
@@ -1074,6 +1090,8 @@ public HashJoinBatch(HashJoinPOP popConfig,
FragmentContext context,
this.buildBatch = right;
this.probeBatch = left;
joinType = popConfig.getJoinType();
+ joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType ==
JoinRelType.FULL;
+ joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType ==
JoinRelType.FULL;
conditions = popConfig.getConditions();
this.popConfig = popConfig;
rightExpr = new ArrayList<>(conditions.size());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index e7fa4e6b57b..486fb1e1a99 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -113,7 +113,7 @@ protected boolean prefetchFirstBatchFromBothSides() {
return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
}
- /*
+ /**
* Checks for the operator specific early terminal condition.
* @return true if the further processing can stop.
* false if the further processing is needed.
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e705d..67789465a59 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -103,7 +103,7 @@ public synchronized void enqueue(final RawFragmentBatch
batch) throws IOExceptio
@Override
public void close() {
if (!isTerminated() && context.getExecutorState().shouldContinue()) {
- final String msg = String.format("Cleanup before finished. %d out of %d
strams have finished", completedStreams(), fragmentCount);
+ final String msg = String.format("Cleanup before finished. %d out of %d
streams have finished", completedStreams(), fragmentCount);
throw new IllegalStateException(msg);
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index 349a295114f..5beb7cbdd98 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -209,4 +209,47 @@ public void testHashJoinNoneOutcomeUninitRightSide() {
public void testHashJoinNoneOutcomeUninitLeftSide() {
testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE,
RecordBatch.IterOutcome.NONE);
}
+
+ /**
+ * Testing for DRILL-6755: No Hash Table is built when the first probe batch
is NONE
+ */
+ @Test
+ public void testHashJoinWhenProbeIsNONE() {
+
+ inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE);
+
+ inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomesRight.add(RecordBatch.IterOutcome.OK);
+ inputOutcomesRight.add(RecordBatch.IterOutcome.NONE);
+
+ // for the probe side input - use multiple batches (to check that they are
all cleared/drained)
+ final List<VectorContainer> buildSideinputContainer = new ArrayList<>(5);
+ buildSideinputContainer.add(emptyInputRowSetRight.container());
+ buildSideinputContainer.add(nonEmptyInputRowSetRight.container());
+ RowSet.SingleRowSet secondInputRowSetRight =
operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build();
+ RowSet.SingleRowSet thirdInputRowSetRight =
operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build();
+ buildSideinputContainer.add(secondInputRowSetRight.container());
+ buildSideinputContainer.add(thirdInputRowSetRight.container());
+
+ final MockRecordBatch mockInputBatchRight = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
buildSideinputContainer, inputOutcomesRight, batchSchemaRight);
+ final MockRecordBatch mockInputBatchLeft = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainerLeft, inputOutcomesLeft, batchSchemaLeft);
+
+ List<JoinCondition> conditions = Lists.newArrayList();
+
+ conditions.add(new JoinCondition(SqlKind.EQUALS.toString(),
FieldReference.getWithQuotedRef("leftcol"),
FieldReference.getWithQuotedRef("rightcol")));
+
+ HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions,
JoinRelType.INNER);
+
+ HashJoinBatch hjBatch = new HashJoinBatch(hjConf,
operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight);
+
+ RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+ assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ gotOutcome = hjBatch.next();
+ assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE);
+
+ secondInputRowSetRight.clear();
+ thirdInputRowSetRight.clear();
+ buildSideinputContainer.clear();
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services