DRILL-578: Performance fixes in Hash Join. Includes a few minor fixes when hash join receives empty batches on build / probe side
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5603a633 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5603a633 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5603a633 Branch: refs/heads/master Commit: 5603a633f1de15af05e011ab6216300c6b0bd775 Parents: beeaf9e Author: Mehant Baid <[email protected]> Authored: Wed Apr 23 16:12:04 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat May 3 18:48:08 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/config/HashJoinPOP.java | 22 ---- .../exec/physical/impl/join/HashJoinBatch.java | 115 +++++++++++++------ .../exec/physical/impl/join/HashJoinHelper.java | 38 +++--- .../exec/physical/impl/join/HashJoinProbe.java | 3 +- .../impl/join/HashJoinProbeTemplate.java | 6 +- .../exec/physical/impl/join/TestHashJoin.java | 26 +++++ .../src/test/resources/join/hj_exchanges.json | 96 ++++++++++++++++ 7 files changed, 231 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java index 0718bff..c3e74cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java @@ -51,7 +51,6 @@ public class HashJoinPOP extends AbstractBase { private final PhysicalOperator right; private final List<JoinCondition> conditions; private final JoinRelType joinType; - private final HashTableConfig htConfig; @Override public OperatorCost getCost() { @@ -69,23 +68,6 @@ public class HashJoinPOP extends AbstractBase { this.right = right; this.conditions = conditions; this.joinType = joinType; - - int conditionsSize = conditions.size(); - - NamedExpression rightExpr[] = new NamedExpression[conditionsSize]; - NamedExpression leftExpr[] = new NamedExpression[conditionsSize]; - - for (int i = 0; i < conditionsSize; i++) { - rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i )); - leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)); - - // Hash join only supports equality currently. - assert conditions.get(i).getRelationship().equals("=="); - } - - this.htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, - HashTable.DEFAULT_LOAD_FACTOR, - rightExpr, leftExpr); } @Override @@ -125,10 +107,6 @@ public class HashJoinPOP extends AbstractBase { return conditions; } - public HashTableConfig getHtConfig() { - return htConfig; - } - public HashJoinPOP flipIfRight(){ if(joinType == JoinRelType.RIGHT){ List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 7ada651..5f0cc94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.physical.impl.join; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.record.*; import org.eigenbase.rel.JoinRelType; @@ -56,8 +59,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Join type, INNER, LEFT, RIGHT or OUTER private final JoinRelType joinType; - // hash table configuration, created in HashJoinPOP - private HashTableConfig htConfig; + // Join conditions + private final List<JoinCondition> conditions; // Runtime generated class implementing HashJoinProbe interface private HashJoinProbe hashJoinProbe = null; @@ -112,6 +115,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { "outgoing" /* write container */, PROJECT_PROBE, PROJECT_PROBE); + // indicates if we have previously returned an output batch + boolean firstOutputBatch = true; + + IterOutcome leftUpstream = IterOutcome.NONE; + @Override public int getRecordCount() { return outputRecords; @@ -136,7 +144,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { * as well, for the materialization to be successful. This batch will not be used * till we complete the build phase. */ - left.next(); + leftUpstream = left.next(); // Build the hash table, using the build side record batches. executeBuildPhase(); @@ -166,7 +174,14 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { v.getValueVector().getMutator().setValueCount(outputRecords); } - return IterOutcome.OK_NEW_SCHEMA; + // First output batch, return OK_NEW_SCHEMA + if (firstOutputBatch == true) { + firstOutputBatch = false; + return IterOutcome.OK_NEW_SCHEMA; + } + + // Not the first output batch + return IterOutcome.OK; } // No more output records, clean up and return @@ -183,12 +198,32 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException { - // Shouldn't be recreating the hash table, this should be done only once - assert hashTable == null; + // Setup the hash table configuration object + int conditionsSize = conditions.size(); + + NamedExpression rightExpr[] = new NamedExpression[conditionsSize]; + NamedExpression leftExpr[] = new NamedExpression[conditionsSize]; + + // Create named expressions from the conditions + for (int i = 0; i < conditionsSize; i++) { + rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i )); + leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)); + + // Hash join only supports equality currently. + assert conditions.get(i).getRelationship().equals("=="); + } + + // Set the left named expression to be null if the probe batch is empty. + if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) { + leftExpr = null; + } + + HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); + + // Create the chained hash table ChainedHashTable ht = new ChainedHashTable(htConfig, context, this.right, this.left, null); hashTable = ht.createAndSetupHashTable(null); - } public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { @@ -198,8 +233,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { boolean moreData = true; - setupHashTable(); - while (moreData) { switch (rightUpstream) { @@ -213,6 +246,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { case OK_NEW_SCHEMA: if (rightSchema == null) { rightSchema = right.getSchema(); + setupHashTable(); } else { throw new SchemaChangeException("Hash join does not support schema changes"); } @@ -277,22 +311,25 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { JExpression buildIndex = JExpr.direct("buildIndex"); JExpression outIndex = JExpr.direct("outIndex"); g.rotateBlock(); - for(VectorWrapper<?> vv : hyperContainer) { - // Add the vector to our output container - ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); - container.add(v); - allocators.add(RemovingRecordBatch.getAllocator4(v)); + if (hyperContainer != null) { + for(VectorWrapper<?> vv : hyperContainer) { - JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false)); + // Add the vector to our output container + ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); + container.add(v); + allocators.add(RemovingRecordBatch.getAllocator4(v)); - g.getEvalBlock().add(outVV.invoke("copyFrom") - .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) - .arg(outIndex) - .arg(inVV.component(buildIndex.shrz(JExpr.lit(16))))); + JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false)); - fieldId++; + g.getEvalBlock().add(outVV.invoke("copyFrom") + .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) + .arg(outIndex) + .arg(inVV.component(buildIndex.shrz(JExpr.lit(16))))); + + fieldId++; + } } // Generate the code to project probe side records @@ -301,23 +338,29 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { int outputFieldId = fieldId; fieldId = 0; JExpression probeIndex = JExpr.direct("probeIndex"); - for (VectorWrapper<?> vv : left) { + int recordCount = 0; - ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); - container.add(v); - allocators.add(RemovingRecordBatch.getAllocator4(v)); + if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) { + for (VectorWrapper<?> vv : left) { - JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false)); + ValueVector v = TypeHelper.getNewVector(vv.getField(), context.getAllocator()); + container.add(v); + allocators.add(RemovingRecordBatch.getAllocator4(v)); - g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV)); + JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false)); - fieldId++; - outputFieldId++; + g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV)); + + fieldId++; + outputFieldId++; + } + recordCount = left.getRecordCount(); } HashJoinProbe hj = context.getImplementationClass(cg); - hj.setupHashJoinProbe(context, hyperContainer, left, this, hashTable, hjHelper, joinType); + + hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType); return hj; } @@ -332,7 +375,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { this.left = left; this.right = right; this.joinType = popConfig.getJoinType(); - this.htConfig = popConfig.getHtConfig(); + this.conditions = popConfig.getConditions(); } @Override @@ -346,10 +389,14 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public void cleanup() { left.cleanup(); right.cleanup(); - hyperContainer.clear(); hjHelper.clear(); container.clear(); - hashTable.clear(); + + // If we didn't receive any data, hyperContainer may be null, check before clearing + if (hyperContainer != null) { + hyperContainer.clear(); + hashTable.clear(); + } super.cleanup(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java index e0098b1..0728ac9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -187,25 +187,33 @@ public class HashJoinHelper { SelectionVector4 startIndex = startIndices.get(batchIdx); int linkIndex; - // If its the first value for this key + // If head of the list is empty, insert current index at this position if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) { startIndex.set(offsetIdx, batchIndex, recordIndex); } else { - /* we already have encountered a record with the same key - * use links to store this value + /* Head of this list is not empty, if the first link + * is empty insert there */ - SelectionVector4 link; - do { - //Traverse the links to get an empty slot to insert the current index - batchIdx = linkIndex >>> SHIFT_SIZE; - offsetIdx = linkIndex & Character.MAX_VALUE; - - // get the next link - link = buildInfoList.get(batchIdx).getLinks(); - } while ((linkIndex = link.get(offsetIdx)) != INDEX_EMPTY); - - // We have the correct batchIdx and offset within the batch to store the next link - link.set(offsetIdx, batchIndex, recordIndex); + batchIdx = linkIndex >>> SHIFT_SIZE; + offsetIdx = linkIndex & Character.MAX_VALUE; + + SelectionVector4 link = buildInfoList.get(batchIdx).getLinks(); + int firstLink = link.get(offsetIdx); + + if (firstLink == INDEX_EMPTY) { + link.set(offsetIdx, batchIndex, recordIndex); + } else { + /* Insert the current value as the first link and + * make the current first link as its next + */ + int firstLinkBatchIdx = firstLink >>> SHIFT_SIZE; + int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE; + + SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks(); + nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx); + + link.set(offsetIdx, batchIndex, recordIndex); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index c99f2a6..0ffdf52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -48,7 +48,8 @@ public interface HashJoinProbe { } public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, JoinRelType joinRelType); + int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, + JoinRelType joinRelType); public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; public abstract void projectBuildRecord(int buildIndex, int outIndex); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index cc1a257..3d6f4d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -80,12 +80,12 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { @Override public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, - JoinRelType joinRelType) { + int probeRecordCount, RecordBatch outgoing, HashTable hashTable, + HashJoinHelper hjHelper, JoinRelType joinRelType) { this.probeBatch = probeBatch; this.joinType = joinRelType; - this.recordsToProcess = probeBatch.getRecordCount(); + this.recordsToProcess = probeRecordCount; this.hashTable = hashTable; this.hjHelper = hjHelper; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index ba067e2..f98015b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -180,6 +180,32 @@ public class TestHashJoin extends PopUnitTestBase{ } @Test + public void hjWithExchange(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable { + + // Function checks for casting from Float, Double to Decimal data types + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/hj_exchanges.json"), Charsets.UTF_8)); + + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + + System.out.println("Total records: " + count); + assertEquals(25, count); + } + } + + @Test public void multipleConditionJoin(@Injectable final DrillbitContext bitContext, @Injectable UserServer.UserClientConnection connection) throws Throwable { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5603a633/exec/java-exec/src/test/resources/join/hj_exchanges.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/join/hj_exchanges.json b/exec/java-exec/src/test/resources/join/hj_exchanges.json new file mode 100644 index 0000000..2ad1cb4 --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hj_exchanges.json @@ -0,0 +1,96 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "DefaultSqlHandler", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "resultMode" : "EXEC" + }, + "graph" : [ { + "pop" : "parquet-scan", + "@id" : 1, + "entries" : [ { + "path" : "/tpch/region.parquet" + } ], + "storage" : { + "type" : "file", + "connection" : "classpath:///", + "workspaces" : { + "default" : "/", + "home" : "/" + }, + "formats" : null + }, + "format" : { + "type" : "parquet" + } + }, { + "pop" : "hash-to-random-exchange", + "@id" : 2, + "child" : 1, + "expr" : "hash(`R_REGIONKEY`) " + }, { + "pop" : "parquet-scan", + "@id" : 3, + "entries" : [ { + "path" : "/tpch/nation.parquet" + } ], + "storage" : { + "type" : "file", + "connection" : "classpath:///", + "workspaces" : { + "default" : "/", + "home" : "/" + }, + "formats" : null + }, + "format" : { + "type" : "parquet" + } + }, { + "pop" : "hash-to-random-exchange", + "@id" : 4, + "child" : 3, + "expr" : "hash(`N_REGIONKEY`) " + }, { + "pop" : "project", + "@id" : 5, + "exprs" : [ { + "ref" : "`*0`", + "expr" : "`*`" + }, { + "ref" : "`N_REGIONKEY`", + "expr" : "`N_REGIONKEY`" + } ], + "child" : 4 + }, { + "pop" : "hash-join", + "@id" : 6, + "left" : 2, + "right" : 5, + "conditions" : [ { + "relationship" : "==", + "left" : "`R_REGIONKEY`", + "right" : "`N_REGIONKEY`" + } ], + "joinType" : "INNER" + }, { + "pop" : "project", + "@id" : 9, + "exprs" : [ { + "ref" : "`R_REGIONKEY`", + "expr" : "`R_REGIONKEY`" + } ], + "child" : 6 + }, { + "pop" : "union-exchange", + "@id" : 10, + "child" : 9 + }, { + "pop" : "screen", + "@id" : 11, + "child" : 10 + } ] +}
