DRILL-1167: Split large runtime generated functions into a chain of functions
+ Modified few operators' (CopyUtil, TopNBatch, ExternalSortBatch) runtime code generators to start a code block for each value vector. + With this change, we are able to project 2000+ expressions, sort on 500+, group by on 200+, filter on 150+ and join on 100+ columns. + The test cases added with this patch adds 3 minutes to the test run and hence I have excluded them from default run. They can be enabled using 'largeTests' maven profile. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3a73b3eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3a73b3eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3a73b3eb Branch: refs/heads/master Commit: 3a73b3eb1050e9f58f8cb324e897a51226f448bb Parents: 97a9a4c Author: Aditya Kishore <adi...@maprtech.com> Authored: Mon Jul 21 22:45:45 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Fri Jul 25 14:34:24 2014 -0700 ---------------------------------------------------------------------- exec/java-exec/pom.xml | 46 +++++++ .../apache/drill/exec/expr/ClassGenerator.java | 57 ++++++--- .../exec/physical/impl/TopN/TopNBatch.java | 2 + .../physical/impl/xsort/ExternalSortBatch.java | 4 + .../org/apache/drill/exec/vector/CopyUtil.java | 7 +- .../java/org/apache/drill/BaseTestQuery.java | 15 +++ .../exec/compile/TestLargeFileCompilation.java | 122 +++++++++++++++---- 7 files changed, 208 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 30dff55..b1023f7 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -325,6 +325,32 @@ </dependency> </dependencies> </profile> + <profile> + <id>largeTests</id> + <activation><activeByDefault>false</activeByDefault></activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>include-large-tests</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <includes> + <include>**/TestLargeFileCompilation.java</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> </profiles> <build> @@ -512,6 +538,26 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <!-- we override the default test execution to exclude tests that would + take unusually long time to run (> 1 minute) and run such tests in a different profile --> + <id>default-test</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <excludes> + <exclude>**/TestLargeFileCompilation.java</exclude> + </excludes> + </configuration> + </execution> + </executions> + </plugin> </plugins> <pluginManagement> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java index 0f5d1fd..4d4fc99 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java @@ -63,6 +63,8 @@ public class ClassGenerator<T>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassGenerator.class); public static enum BlockType {SETUP, EVAL, RESET, CLEANUP}; + private static final int MAX_BLOCKS_IN_FUNCTION = 50; + private final SignatureHolder sig; private final EvaluationVisitor evaluationVisitor; private final Map<ValueVectorSetup, JVar> vvDeclaration = Maps.newHashMap(); @@ -227,26 +229,53 @@ public class ClassGenerator<T>{ } } - void flushCode(){ - int i =0; - for(CodeGeneratorMethod method : sig){ - JMethod m = clazz.method(JMod.PUBLIC, model._ref(method.getReturnType()), method.getMethodName()); - for(CodeGeneratorArgument arg : method){ - m.param(arg.getType(), arg.getName()); + void flushCode() { + int i = 0; + for(CodeGeneratorMethod method : sig) { + JMethod outer = clazz.method(JMod.PUBLIC, model._ref(method.getReturnType()), method.getMethodName()); + for(CodeGeneratorArgument arg : method) { + outer.param(arg.getType(), arg.getName()); } - for(Class<?> c : method.getThrowsIterable()){ - m._throws(model.ref(c)); + for(Class<?> c : method.getThrowsIterable()) { + outer._throws(model.ref(c)); } - m._throws(SchemaChangeException.class); - - for(JBlock b : blocks[i++]){ - if(!b.isEmpty()) m.body().add(b); + outer._throws(SchemaChangeException.class); + + int methodIndex = 0; + int blocksInMethod = 0; + boolean isVoidMethod = method.getReturnType() == void.class; + for(JBlock b : blocks[i++]) { + if(!b.isEmpty()) { + if (blocksInMethod > MAX_BLOCKS_IN_FUNCTION) { + JMethod inner = clazz.method(JMod.PRIVATE, model._ref(method.getReturnType()), method.getMethodName() + methodIndex); + JInvocation methodCall = JExpr.invoke(inner); + for(CodeGeneratorArgument arg : method){ + inner.param(arg.getType(), arg.getName()); + methodCall.arg(JExpr.direct(arg.getName())); + } + for(Class<?> c : method.getThrowsIterable()){ + inner._throws(model.ref(c)); + } + inner._throws(SchemaChangeException.class); + + if (isVoidMethod) { + outer.body().add(methodCall); + } else { + outer.body()._return(methodCall); + } + outer = inner; + blocksInMethod = 0; + ++methodIndex; + } + outer.body().add(b); + ++blocksInMethod; + } } } - for(ClassGenerator<T> child : innerClasses.values()){ + for(ClassGenerator<T> child : innerClasses.values()) { child.flushCode(); - } + } } public JCodeModel getModel() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 77be4ef..0132e85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -280,8 +280,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { }else{ jc._then()._return(out.getValue().minus()); } + g.rotateBlock(); } + g.rotateBlock(); g.getEvalBlock()._return(JExpr.lit(0)); PriorityQueue q = context.getImplementationClass(cg); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index e5da896..08219a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -474,8 +474,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { }else{ jc._then()._return(out.getValue().minus()); } + g.rotateBlock(); } + g.rotateBlock(); g.getEvalBlock()._return(JExpr.lit(0)); return context.getImplementationClass(cg); @@ -517,8 +519,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { }else{ jc._then()._return(out.getValue().minus()); } + g.rotateBlock(); } + g.rotateBlock(); g.getEvalBlock()._return(JExpr.lit(0)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java index 1f09792..c68f62f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java @@ -32,8 +32,8 @@ public class CopyUtil { JExpression inIndex = JExpr.direct("inIndex"); JExpression outIndex = JExpr.direct("outIndex"); - g.rotateBlock(); - for(VectorWrapper<?> vv : batch){ + for(VectorWrapper<?> vv : batch) { + g.rotateBlock(); JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId)); @@ -55,10 +55,9 @@ public class CopyUtil { g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); } - + g.rotateBlock(); fieldId++; } - g.rotateBlock(); g.getEvalBlock()._return(JExpr.TRUE); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index b1c1ec8..9aa76ec 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -159,6 +159,21 @@ public class BaseTestQuery extends ExecTest{ client.runQuery(type, query, resultListener); } + protected void testNoResult(String query, Object... args) throws Exception { + testNoResult(1, query, args); + } + + protected void testNoResult(int interation, String query, Object... args) throws Exception { + query = String.format(query, args); + logger.debug("Running query:\n--------------\n"+query); + for (int i = 0; i < interation; i++) { + List<QueryResultBatch> results = client.runQuery(QueryType.SQL, query); + for (QueryResultBatch queryResultBatch : results) { + queryResultBatch.release(); + } + } + } + protected void test(String query) throws Exception{ String[] queries = query.split(";"); for(String q : queries){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3a73b3eb/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java index f6f06ad..c825586 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java @@ -17,53 +17,121 @@ */ package org.apache.drill.exec.compile; -import java.util.List; - import org.apache.drill.BaseTestQuery; -import org.apache.drill.exec.proto.UserBitShared.QueryType; -import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.ExecConstants; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestRule; public class TestLargeFileCompilation extends BaseTestQuery { + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); + + private static final String LARGE_QUERY_GROUP_BY; + + private static final String LARGE_QUERY_ORDER_BY; + + private static final String LARGE_QUERY_ORDER_BY_WITH_LIMIT; + + private static final String LARGE_QUERY_FILTER; - private static final String LARGE_QUERY; + private static final String LARGE_QUERY_WRITER; private static final int ITERATION_COUNT = Integer.valueOf(System.getProperty("TestLargeFileCompilation.iteration", "1")); + private static final int NUM_PROJECT_COULMNS = 2000; + + private static final int NUM_ORDERBY_COULMNS = 500; + + private static final int NUM_GROUPBY_COULMNS = 225; + + private static final int NUM_FILTER_COULMNS = 150; + static { - StringBuilder sb = new StringBuilder("select \n"); - for (int i = 0; i < 300; i++) { - sb.append("\temployee_id+").append(i).append(" as col").append(i).append(",\n"); + StringBuilder sb = new StringBuilder("select\n\t"); + for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) { + sb.append("c").append(i).append(", "); } - sb.append("\tfull_name\nfrom cp.`employee.json` limit 1"); - LARGE_QUERY = sb.toString(); + sb.append("full_name\nfrom (select\n\t"); + for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) { + sb.append("employee_id+").append(i).append(" as c").append(i).append(", "); + } + sb.append("full_name\nfrom cp.`employee.json`)\ngroup by\n\t"); + for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) { + sb.append("c").append(i).append(", "); + } + LARGE_QUERY_GROUP_BY = sb.append("full_name").toString(); + } + + static { + StringBuilder sb = new StringBuilder("select\n\t"); + for (int i = 0; i < NUM_PROJECT_COULMNS; i++) { + sb.append("employee_id+").append(i).append(" as col").append(i).append(", "); + } + sb.append("full_name\nfrom cp.`employee.json`\norder by\n\t"); + for (int i = 0; i < NUM_ORDERBY_COULMNS; i++) { + sb.append(" col").append(i).append(", "); + } + LARGE_QUERY_ORDER_BY = sb.append("full_name").toString(); + LARGE_QUERY_ORDER_BY_WITH_LIMIT = sb.append("\nlimit 1").toString(); + } + + static { + StringBuilder sb = new StringBuilder("select *\n") + .append("from cp.`employee.json`\n") + .append("where"); + for (int i = 0; i < NUM_FILTER_COULMNS; i++) { + sb.append(" employee_id+").append(i).append(" < employee_id ").append(i%2==0?"OR":"AND"); + } + LARGE_QUERY_FILTER = sb.append(" true") .toString(); + } + + static { + StringBuilder sb = new StringBuilder("create table %s as (select \n"); + for (int i = 0; i < NUM_PROJECT_COULMNS; i++) { + sb.append("employee_id+").append(i).append(" as col").append(i).append(", "); + } + LARGE_QUERY_WRITER = sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString(); + } + + @Test + public void testTEXT_WRITER() throws Exception { + testNoResult("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION); + testNoResult("use dfs_test.tmp"); + testNoResult("alter session set `%s`='csv'", ExecConstants.OUTPUT_FORMAT_OPTION); + testNoResult(LARGE_QUERY_WRITER, "wide_table_csv"); } @Test - public void testWithJDK() throws Exception { - test(String.format("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION)); - runTest(); + public void testPARQUET_WRITER() throws Exception { + testNoResult("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION); + testNoResult("use dfs_test.tmp"); + testNoResult("alter session set `%s`='parquet'", ExecConstants.OUTPUT_FORMAT_OPTION); + testNoResult(ITERATION_COUNT, LARGE_QUERY_WRITER, "wide_table_parquet"); } @Test - public void testWithDEFAULT() throws Exception { - test(String.format("alter session set `%s`='DEFAULT'", QueryClassLoader.JAVA_COMPILER_OPTION)); - runTest(); + public void testGROUP_BY() throws Exception { + testNoResult("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION); + testNoResult(ITERATION_COUNT, LARGE_QUERY_GROUP_BY); } - @Test(expected=org.apache.drill.exec.rpc.RpcException.class) - public void testWithJanino() throws Exception { - test(String.format("alter session set `%s`='JANINO'", QueryClassLoader.JAVA_COMPILER_OPTION)); - runTest(); + @Test + public void testEXTERNAL_SORT() throws Exception { + testNoResult("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION); + testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY); } - private void runTest() throws Exception { - for (int i = 0; i < ITERATION_COUNT; i++) { - List<QueryResultBatch> results = client.runQuery(QueryType.SQL, LARGE_QUERY); - for (QueryResultBatch queryResultBatch : results) { - queryResultBatch.release(); - } - } + @Test + public void testTOP_N_SORT() throws Exception { + testNoResult("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION); + testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY_WITH_LIMIT); + } + + @Test + public void testFILTER() throws Exception { + testNoResult("alter session set `%s`='JDK'", QueryClassLoader.JAVA_COMPILER_OPTION); + testNoResult(ITERATION_COUNT, LARGE_QUERY_FILTER); } }