Ben-Zvi commented on a change in pull request #1408: DRILL-6453: Resolve
deadlock when reading from build and probe sides simultaneously in HashJoin
URL: https://github.com/apache/drill/pull/1408#discussion_r208401852
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
##########
@@ -248,95 +257,134 @@ protected void buildSchema() throws
SchemaChangeException {
}
}
- @Override
- protected boolean prefetchFirstBatchFromBothSides() {
- if (leftUpstream != IterOutcome.NONE) {
- // We can only get data if there is data available
- leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
- }
-
- if (rightUpstream != IterOutcome.NONE) {
- // We can only get data if there is data available
- rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
- }
-
- buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
-
- if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
- // For build side, use aggregate i.e. average row width across batches
- batchMemoryManager.update(LEFT_INDEX, 0);
- batchMemoryManager.update(RIGHT_INDEX, 0, true);
-
- logger.debug("BATCH_STATS, incoming left: {}",
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming right: {}",
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
-
- // Got our first batche(s)
- state = BatchState.FIRST;
- return true;
- } else {
- return false;
- }
- }
/**
* Sniffs all data necessary to construct a schema.
* @return True if all the data necessary to construct a schema has been
retrieved. False otherwise.
*/
private boolean sniffNewSchemas() {
+ leftUpstream = sniffNewSchema(LEFT_INDEX,
+ left,
+ () -> probeSchema = left.getSchema());
+
+ rightUpstream = sniffNewSchema(RIGHT_INDEX,
+ right,
+ () -> {
+ // We need to have the schema of the build side even when the build
side is empty
+ buildSchema = right.getSchema();
+ // position of the new "column" for keeping the hash values (after the
real columns)
+ rightHVColPosition = right.getContainer().getNumberOfColumns();
+ });
+
+ // Left and right sides must return a valid response and both sides cannot
be NONE.
+ return (!leftUpstream.isError() && !rightUpstream.isError()) &&
+ (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
+ }
+
+ private IterOutcome sniffNewSchema(final int index,
+ final RecordBatch batch,
+ final Runnable schemaSetter) {
Review comment:
The use of lambda function as a parameter just makes this code harder to
read, and adds no real value.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services