DRILL-1108: Fix run-time code Generator Mapping when join condition is a compound expression containing constants.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a4c0ba72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a4c0ba72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a4c0ba72 Branch: refs/heads/master Commit: a4c0ba72defc87871ce4613260e94f726f8049a8 Parents: 2ab3dfa Author: Jinfeng Ni <j...@maprtech.com> Authored: Wed Jun 25 15:09:46 2014 -0700 Committer: Jinfeng Ni <j...@maprtech.com> Committed: Sun Jul 20 22:20:38 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/common/ChainedHashTable.java | 28 ++-- .../exec/physical/impl/join/HashJoinBatch.java | 16 ++- .../exec/physical/impl/join/MergeJoinBatch.java | 12 +- .../exec/physical/impl/join/TestHashJoin.java | 23 +++ .../exec/physical/impl/join/TestMergeJoin.java | 21 +++ .../src/test/resources/join/hashJoinExpr.json | 109 ++++++++++++++ .../src/test/resources/join/mergeJoinExpr.json | 143 +++++++++++++++++++ 7 files changed, 334 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 7522488..91d2037 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -82,15 +82,25 @@ public class ChainedHashTable { GeneratorMapping.create("setupInterior" /* setup method */, "outputRecordKeys" /* eval method */, null /* reset */, null /* cleanup */) ; - private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD); - private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE); - private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_BUILD, KEY_MATCH_BUILD); - private final MappingSet KeyMatchHtableProbeMapping = new MappingSet("htRowIdx", null, "htContainer", null, KEY_MATCH_PROBE, KEY_MATCH_PROBE); - private final MappingSet GetHashIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, GET_HASH_BUILD, GET_HASH_BUILD); - private final MappingSet GetHashIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, GET_HASH_PROBE, GET_HASH_PROBE); - private final MappingSet SetValueMapping = new MappingSet("incomingRowIdx" /* read index */, "htRowIdx" /* write index */, "incomingBuild" /* read container */, "htContainer" /* write container */, SET_VALUE, SET_VALUE); - - private final MappingSet OutputRecordKeysMapping = new MappingSet("htRowIdx" /* read index */, "outRowIdx" /* write index */, "htContainer" /* read container */, "outgoing" /* write container */, OUTPUT_KEYS, OUTPUT_KEYS); + // GM for putting constant expression into method "setupInterior" + private static final GeneratorMapping SETUP_INTERIOR_CONSTANT = + GeneratorMapping.create("setupInterior" /* setup method */, "setupInterior" /* eval method */, + null /* reset */, null /* cleanup */); + + // GM for putting constant expression into method "doSetup" + private static final GeneratorMapping DO_SETUP_CONSTANT = + GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, + null /* reset */, null /* cleanup */); + + private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD); + private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_PROBE); + private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD); + private final MappingSet KeyMatchHtableProbeMapping = new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_PROBE); + private final MappingSet GetHashIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, DO_SETUP_CONSTANT, GET_HASH_BUILD); + private final MappingSet GetHashIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, DO_SETUP_CONSTANT, GET_HASH_PROBE); + private final MappingSet SetValueMapping = new MappingSet("incomingRowIdx" /* read index */, "htRowIdx" /* write index */, "incomingBuild" /* read container */, "htContainer" /* write container */, SETUP_INTERIOR_CONSTANT, SET_VALUE); + + private final MappingSet OutputRecordKeysMapping = new MappingSet("htRowIdx" /* read index */, "outRowIdx" /* write index */, "htContainer" /* read container */, "outgoing" /* write container */, SETUP_INTERIOR_CONSTANT, OUTPUT_KEYS); private HashTableConfig htConfig; private final FragmentContext context; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 5fc3125..46f7d51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -111,26 +111,36 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { private boolean done = false; // Generator mapping for the build side + // Generator mapping for the build side : scalar private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method */, null /* reset */, null /* cleanup */); + // Generator mapping for the build side : constant + private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/* setup method */, + "doSetup" /* eval method */, + null /* reset */, null /* cleanup */); - // Generator mapping for the probe side + // Generator mapping for the probe side : scalar private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" /* eval method */, null /* reset */, null /* cleanup */); + // Generator mapping for the probe side : constant + private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */, + "doSetup" /* eval method */, + null /* reset */, null /* cleanup */); + // Mapping set for the build side private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */, "buildBatch" /* read container */, "outgoing" /* write container */, - PROJECT_BUILD, PROJECT_BUILD); + PROJECT_BUILD_CONSTANT, PROJECT_BUILD); // Mapping set for the probe side private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */, "probeBatch" /* read container */, "outgoing" /* write container */, - PROJECT_PROBE, PROJECT_PROBE); + PROJECT_PROBE_CONSTANT, PROJECT_PROBE); // indicates if we have previously returned an output batch boolean firstOutputBatch = true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 6943d1a..0c6657c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -79,27 +79,27 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { GM("doSetup", "doSetup", null, null)); public final MappingSet copyLeftMapping = new MappingSet("leftIndex", "outIndex", - GM("doSetup", "doCopyLeft", null, null), + GM("doSetup", "doSetup", null, null), GM("doSetup", "doCopyLeft", null, null)); public final MappingSet copyRightMappping = new MappingSet("rightIndex", "outIndex", - GM("doSetup", "doCopyRight", null, null), + GM("doSetup", "doSetup", null, null), GM("doSetup", "doCopyRight", null, null)); public final MappingSet compareMapping = new MappingSet("leftIndex", "rightIndex", - GM("doSetup", "doCompare", null, null), + GM("doSetup", "doSetup", null, null), GM("doSetup", "doCompare", null, null)); public final MappingSet compareRightMapping = new MappingSet("rightIndex", "null", - GM("doSetup", "doCompare", null, null), + GM("doSetup", "doSetup", null, null), GM("doSetup", "doCompare", null, null)); public final MappingSet compareLeftMapping = new MappingSet("leftIndex", "null", - GM("doSetup", "doCompareNextLeftKey", null, null), + GM("doSetup", "doSetup", null, null), GM("doSetup", "doCompareNextLeftKey", null, null)); public final MappingSet compareNextLeftMapping = new MappingSet("nextLeftIndex", "null", - GM("doSetup", "doCompareNextLeftKey", null, null), + GM("doSetup", "doSetup", null, null), GM("doSetup", "doCompareNextLeftKey", null, null)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index e24426e..b7bd9e8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -267,4 +267,27 @@ public class TestHashJoin extends PopUnitTestBase{ assertEquals(272, count); } } + + + @Test + public void testHashJoinExprInCondition() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/hashJoinExpr.json"), Charsets.UTF_8)); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + b.release(); + } + assertEquals(10, count); + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java index 4c1ce93..7afef56 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java @@ -372,4 +372,25 @@ public class TestMergeJoin extends PopUnitTestBase { } } + @Test + public void testMergeJoinExprInCondition() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/mergeJoinExpr.json"), Charsets.UTF_8)); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + b.release(); + } + assertEquals(10, count); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/test/resources/join/hashJoinExpr.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/join/hashJoinExpr.json b/exec/java-exec/src/test/resources/join/hashJoinExpr.json new file mode 100644 index 0000000..386d90e --- /dev/null +++ b/exec/java-exec/src/test/resources/join/hashJoinExpr.json @@ -0,0 +1,109 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "DefaultSqlHandler", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "options" : [ ], + "queue" : 0, + "resultMode" : "EXEC" + }, + "graph" : [ { + "pop" : "parquet-scan", + "@id" : 5, + "entries" : [ { + "path" : "/tpch/region.parquet" + } ], + "storage" : { + "type" : "file", + "enabled" : true, + "connection" : "classpath:///", + "workspaces" : null, + "formats" : null + }, + "format" : { + "type" : "parquet" + }, + "columns" : [ "`r_regionkey`" ], + "selectionRoot" : "/tpch/region.parquet", + "cost" : 5.0 + }, { + "pop" : "project", + "@id" : 3, + "exprs" : [ { + "ref" : "`$f2`", + "expr" : "`r_regionkey`" + } ], + "child" : 5, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 5.0 + }, { + "pop" : "parquet-scan", + "@id" : 6, + "entries" : [ { + "path" : "/tpch/nation.parquet" + } ], + "storage" : { + "type" : "file", + "enabled" : true, + "connection" : "classpath:///", + "workspaces" : null, + "formats" : null + }, + "format" : { + "type" : "parquet" + }, + "columns" : [ "`n_nationkey`", "`n_regionkey`" ], + "selectionRoot" : "/tpch/nation.parquet", + "cost" : 25.0 + }, { + "pop" : "project", + "@id" : 4, + "exprs" : [ { + "ref" : "`n_nationkey`", + "expr" : "`n_nationkey`" + }, { + "ref" : "`$f3`", + "expr" : "add(`n_regionkey`, 1) " + } ], + "child" : 6, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "hash-join", + "@id" : 2, + "left" : 4, + "right" : 3, + "conditions" : [ { + "relationship" : "==", + "left" : "add(`$f3`, 2)", + "right" : "`$f2`" + } ], + "joinType" : "INNER", + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "project", + "@id" : 1, + "exprs" : [ { + "ref" : "`n_nationkey`", + "expr" : "`n_nationkey`" + } ], + "child" : 2, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "screen", + "@id" : 0, + "child" : 1, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + } ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a4c0ba72/exec/java-exec/src/test/resources/join/mergeJoinExpr.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/join/mergeJoinExpr.json b/exec/java-exec/src/test/resources/join/mergeJoinExpr.json new file mode 100644 index 0000000..1c5111b --- /dev/null +++ b/exec/java-exec/src/test/resources/join/mergeJoinExpr.json @@ -0,0 +1,143 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "DefaultSqlHandler", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "options" : [ { + "name" : "planner.enable_hashjoin", + "kind" : "BOOLEAN", + "type" : "SESSION", + "bool_val" : false + } ], + "queue" : 0, + "resultMode" : "EXEC" + }, + "graph" : [ { + "pop" : "parquet-scan", + "@id" : 9, + "entries" : [ { + "path" : "/tpch/nation.parquet" + } ], + "storage" : { + "type" : "file", + "enabled" : true, + "connection" : "classpath:///", + "workspaces" : null, + "formats" : null + }, + "format" : { + "type" : "parquet" + }, + "columns" : [ "`n_nationkey`", "`n_regionkey`" ], + "selectionRoot" : "/tpch/nation.parquet", + "cost" : 25.0 + }, { + "pop" : "project", + "@id" : 8, + "exprs" : [ { + "ref" : "`n_nationkey`", + "expr" : "`n_nationkey`" + }, { + "ref" : "`$f3`", + "expr" : "add(`n_regionkey`, 1) " + } ], + "child" : 9, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "external-sort", + "@id" : 6, + "child" : 8, + "orderings" : [ { + "order" : "ASC", + "expr" : "`$f3`", + "nullDirection" : "UNSPECIFIED" + } ], + "reverse" : false, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "selection-vector-remover", + "@id" : 4, + "child" : 6, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "parquet-scan", + "@id" : 7, + "entries" : [ { + "path" : "/tpch/region.parquet" + } ], + "storage" : { + "type" : "file", + "enabled" : true, + "connection" : "classpath:///", + "workspaces" : null, + "formats" : null + }, + "format" : { + "type" : "parquet" + }, + "columns" : [ "`r_regionkey`" ], + "selectionRoot" : "/tpch/region.parquet", + "cost" : 5.0 + }, { + "pop" : "external-sort", + "@id" : 5, + "child" : 7, + "orderings" : [ { + "order" : "ASC", + "expr" : "`r_regionkey`", + "nullDirection" : "UNSPECIFIED" + } ], + "reverse" : false, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 5.0 + }, { + "pop" : "selection-vector-remover", + "@id" : 3, + "child" : 5, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 5.0 + }, { + "pop" : "merge-join", + "@id" : 2, + "left" : 4, + "right" : 3, + "conditions" : [ { + "relationship" : "==", + "left" : "add(`$f3`, 2)", + "right" : "`r_regionkey`" + } ], + "joinType" : "INNER", + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "project", + "@id" : 1, + "exprs" : [ { + "ref" : "`n_nationkey`", + "expr" : "`n_nationkey`" + } ], + "child" : 2, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + }, { + "pop" : "screen", + "@id" : 0, + "child" : 1, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 25.0 + } ] +}