Repository: drill Updated Branches: refs/heads/master 862ab91e9 -> a53e12336
DRILL-2591: In UnionAllRecordBactch, the mechansim to detect schema change is corrected Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a53e1233 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a53e1233 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a53e1233 Branch: refs/heads/master Commit: a53e12336c29b421f1df51da480af9a65d70bb72 Parents: 862ab91 Author: Hsuan-Yi Chu <[email protected]> Authored: Fri Mar 27 11:37:07 2015 -0700 Committer: Aman Sinha <[email protected]> Committed: Sun Apr 5 21:43:41 2015 -0700 ---------------------------------------------------------------------- .../impl/union/UnionAllRecordBatch.java | 31 ++++++++-- .../physical/visitor/FinalColumnReorderer.java | 20 ++++--- .../java/org/apache/drill/TestUnionAll.java | 63 +++++++++++++++++++- .../src/test/resources/store/json/dateData.json | 12 ++++ .../test/resources/store/json/timeStmpData.json | 14 +++++ .../testframework/testUnionAllQueries/q18.tsv | 15 +++++ 6 files changed, 141 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 806104a..61de3a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -284,6 +284,11 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { private IterOutcome upstream = IterOutcome.NOT_YET; private boolean leftIsFinish = false; + // These two schemas are obtained from the first record batches of the left and right inputs + // They are used to check if the schema is changed between recordbatches + private BatchSchema leftSchema; + private BatchSchema rightSchema; + public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) { this.unionAllRecordBatch = unionAllRecordBatch; leftSide = new OneSideInput(left); @@ -321,13 +326,20 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { upstream = iterOutcome; return upstream; + case OK_NEW_SCHEMA: + if(!rightSide.getRecordBatch().getSchema().equals(rightSchema)) { + throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported"); + } + + upstream = IterOutcome.OK; + // fall through case OK: unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch()); upstream = iterOutcome; return upstream; default: - throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported"); + throw new IllegalStateException(String.format("Unknown state %s.", upstream)); } } else { IterOutcome iterOutcome = leftSide.nextBatch(); @@ -338,7 +350,14 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { upstream = iterOutcome; return upstream; - case OK: + case OK_NEW_SCHEMA: + if(!leftSide.getRecordBatch().getSchema().equals(leftSchema)) { + throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported"); + } + + upstream = IterOutcome.OK; + // fall through + case OK: unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); upstream = iterOutcome; return upstream; @@ -350,7 +369,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return upstream; default: - throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported"); + throw new IllegalStateException(String.format("Unknown state %s.", upstream)); } } } @@ -360,8 +379,10 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { // where the output type is chosen based on DRILL's implicit casting rules private void inferOutputFields() { outputFields = Lists.newArrayList(); - Iterator<MaterializedField> leftIter = leftSide.getRecordBatch().getSchema().iterator(); - Iterator<MaterializedField> rightIter = rightSide.getRecordBatch().getSchema().iterator(); + leftSchema = leftSide.getRecordBatch().getSchema(); + rightSchema = rightSide.getRecordBatch().getSchema(); + Iterator<MaterializedField> leftIter = leftSchema.iterator(); + Iterator<MaterializedField> rightIter = rightSchema.iterator(); int index = 1; while(leftIter.hasNext() && rightIter.hasNext()) { http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java index 1aa033b..375d69f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java @@ -45,14 +45,10 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc @Override public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException { Prel newChild = ((Prel) prel.getChild()).accept(this, value); - return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild))); + return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild, true))); } private Prel addTrivialOrderedProjectPrel(Prel prel) { - if (!prel.needsFinalColumnReordering()) { - return prel; - } - RelDataType t = prel.getRowType(); RexBuilder b = prel.getCluster().getRexBuilder(); @@ -64,16 +60,24 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc return prel; } - for (int i =0; i < projectCount; i++) { + for (int i = 0; i < projectCount; i++) { projections.add(b.makeInputRef(prel, i)); } return new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, prel.getRowType()); } + private Prel addTrivialOrderedProjectPrel(Prel prel, boolean checkNecessity) { + if(checkNecessity && !prel.needsFinalColumnReordering()) { + return prel; + } else { + return addTrivialOrderedProjectPrel(prel); + } + } + @Override public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException { Prel newChild = ((Prel) prel.getChild()).accept(this, null); - return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild))); + return prel.copy(prel.getTraitSet(), Collections.singletonList( (RelNode) addTrivialOrderedProjectPrel(newChild, true))); } @Override @@ -105,7 +109,7 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc boolean needProjectBelowUnion = !(p instanceof ProjectPrel); if(needProjectBelowUnion) { - child = addTrivialOrderedProjectPrel(child); + child = addTrivialOrderedProjectPrel(child, false); } children.add(child); http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java index fcf5c9f..fee1d6a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java @@ -25,7 +25,7 @@ import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException; import org.junit.Test; public class TestUnionAll extends BaseTestQuery{ -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class); @Test // Simple Union-All over two scans public void testUnionAll1() throws Exception { @@ -368,4 +368,65 @@ public class TestUnionAll extends BaseTestQuery{ test(query); } + + @Test // see DRILL-2591 + public void testDateAndTimestampJson() throws Exception { + String rootDate = FileUtils.getResourceAsFile("/store/json/dateData.json").toURI().toString(); + String rootTimpStmp = FileUtils.getResourceAsFile("/store/json/timeStmpData.json").toURI().toString(); + + String query = String.format( + "(select max(key) as key from dfs_test.`%s` " + + "union all " + + "select key from dfs_test.`%s`)", rootDate, rootTimpStmp); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .csvBaselineFile("testframework/testUnionAllQueries/q18.tsv") + .baselineTypes(TypeProtos.MinorType.VARCHAR) + .baselineColumns("key") + .build().run(); + } + + @Test // see DRILL-2637 + public void testUnionAllOneInputContainsAggFunction() throws Exception { + String root = FileUtils.getResourceAsFile("/multilevel/csv/1994/Q1/orders_94_q1.csv").toURI().toString(); + String query1 = String.format("select * from ((select count(c1) as ct from (select columns[0] c1 from dfs.`%s`)) \n" + + "union all \n" + + "(select columns[0] c2 from dfs.`%s`)) order by ct limit 3", root, root); + + String query2 = String.format("select * from ((select columns[0] ct from dfs.`%s`)\n" + + "union all \n" + + "(select count(c1) as c2 from (select columns[0] c1 from dfs.`%s`))) order by ct limit 3", root, root); + + String query3 = String.format("select * from ((select count(c1) as ct from (select columns[0] c1 from dfs.`%s`))\n" + + "union all \n" + + "(select count(c1) as c2 from (select columns[0] c1 from dfs.`%s`))) order by ct", root, root); + + testBuilder() + .sqlQuery(query1) + .ordered() + .baselineColumns("ct") + .baselineValues((long) 10) + .baselineValues((long) 66) + .baselineValues((long) 99) + .build().run(); + + testBuilder() + .sqlQuery(query2) + .ordered() + .baselineColumns("ct") + .baselineValues((long) 10) + .baselineValues((long) 66) + .baselineValues((long) 99) + .build().run(); + + testBuilder() + .sqlQuery(query3) + .ordered() + .baselineColumns("ct") + .baselineValues((long) 10) + .baselineValues((long) 10) + .build().run(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/store/json/dateData.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/store/json/dateData.json b/exec/java-exec/src/test/resources/store/json/dateData.json new file mode 100644 index 0000000..d15d9bc --- /dev/null +++ b/exec/java-exec/src/test/resources/store/json/dateData.json @@ -0,0 +1,12 @@ +{"key":"2009-03-03"} +{"key":"2001-08-27"} +{"key":"2011-07-26"} +{"key":"1970-09-02"} +{"key":"1983-04-24"} +{"key":"2007-02-01"} +{"key":"1977-08-03"} +{"key":"1962-05-14"} +{"key":"1950-02-16"} +{"key":"1983-09-05"} +{"key":"2000-09-09"} +{"key":"1960-08-18"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/store/json/timeStmpData.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/store/json/timeStmpData.json b/exec/java-exec/src/test/resources/store/json/timeStmpData.json new file mode 100644 index 0000000..8c150dd --- /dev/null +++ b/exec/java-exec/src/test/resources/store/json/timeStmpData.json @@ -0,0 +1,14 @@ +{"key":"2015-03-26 19:04:55.542"} +{"key":"2015-03-26 19:04:55.542"} +{"key":"2015-03-26 19:04:55.542"} +{"key":"2015-03-26 19:04:55.543"} +{"key":"2015-03-26 19:04:55.543"} +{"key":"2015-03-26 19:04:55.543"} +{"key":"2015-03-26 19:04:55.543"} +{"key":"2015-03-26 19:04:55.543"} +{"key":"2015-03-26 19:04:55.543"} +{"key":"2015-03-26 19:04:55.544"} +{"key":"2015-03-26 19:04:55.544"} +{"key":"2015-03-26 19:04:55.544"} +{"key":"2015-03-26 19:04:55.544"} +{"key":"2015-03-26 19:04:55.544"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a53e1233/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv new file mode 100644 index 0000000..ccf0d35 --- /dev/null +++ b/exec/java-exec/src/test/resources/testframework/testUnionAllQueries/q18.tsv @@ -0,0 +1,15 @@ +2011-07-26 +2015-03-26 19:04:55.542 +2015-03-26 19:04:55.542 +2015-03-26 19:04:55.542 +2015-03-26 19:04:55.543 +2015-03-26 19:04:55.543 +2015-03-26 19:04:55.543 +2015-03-26 19:04:55.543 +2015-03-26 19:04:55.543 +2015-03-26 19:04:55.543 +2015-03-26 19:04:55.544 +2015-03-26 19:04:55.544 +2015-03-26 19:04:55.544 +2015-03-26 19:04:55.544 +2015-03-26 19:04:55.544 \ No newline at end of file
