Repository: tajo Updated Branches: refs/heads/master 008940ccb -> 7481050e3
TAJO-1525: Implement INTERSECT [ALL] physical operator. (Contributed Keuntae Park, committed by hyunsik) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7481050e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7481050e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7481050e Branch: refs/heads/master Commit: 7481050e3d75eb860590f3237aca210fdd98190a Parents: 008940c Author: Hyunsik Choi <[email protected]> Authored: Mon Jul 20 21:15:11 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Jul 20 21:15:11 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../planner/physical/SetTupleComparator.java | 68 +++++ .../planner/physical/SortIntersectExec.java | 91 ++++++ .../planner/physical/TestSortIntersectExec.java | 294 +++++++++++++++++++ 4 files changed, 456 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7481050e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0ec1699..a07643a 100644 --- a/CHANGES +++ b/CHANGES @@ -384,6 +384,9 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1525: Implement INTERSECT [ALL] physical operator. (Contributed + Keuntae Park, committed by hyunsik) + TAJO-1684: CREATE EXTERNAL TABLE should allows just a path. (hyunsik) TAJO-1670: Refactor client errors and exceptions. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/7481050e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java new file mode 100644 index 0000000..ed60271 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Tuple; + +import java.util.Comparator; + +public class SetTupleComparator implements Comparator<Tuple> { + private int numCompKey; + + private Datum outer; + private Datum inner; + private int compVal; + + public SetTupleComparator(Schema leftschema, Schema rightschema) { + Preconditions.checkArgument(leftschema.size() == rightschema.size(), + "The size of both side schema must be equals, but they are different: " + + leftschema.size() + " and " + rightschema.size()); + + this.numCompKey = leftschema.size(); // because it is guaranteed that the size of both schemas are the same + } + + @Override + public int compare(Tuple outerTuple, Tuple innerTuple) { + for (int i = 0; i < numCompKey; i++) { + outer = (outerTuple == null) ? NullDatum.get() : outerTuple.asDatum(i); + inner = (innerTuple == null) ? NullDatum.get() : innerTuple.asDatum(i); + + if (outer.isNull()) { + // NullDatum can handle comparison with all types of Datum + compVal = outer.compareTo(inner); + } else if(inner.isNull()) { + // NullDatum is greater than any non NullDatums in Tajo + compVal = -1; + } else { + // Both tuple are not NullDatum + compVal = outer.compareTo(inner); + } + + if (compVal != 0) { + return compVal; + } + } + return 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7481050e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java new file mode 100644 index 0000000..db77f85 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.plan.InvalidQueryException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.Arrays; + +public class SortIntersectExec extends BinaryPhysicalExec { + SetTupleComparator comparator; + Tuple lastReturned = null; + boolean isDistinct = false; + public SortIntersectExec(TaskAttemptContext context, PhysicalExec left, PhysicalExec right, boolean isDistinct) { + super(context, left.getSchema(), right.getSchema(), left, right); + TajoDataTypes.DataType[] leftTypes = SchemaUtil.toDataTypes(left.getSchema()); + TajoDataTypes.DataType[] rightTypes = SchemaUtil.toDataTypes(right.getSchema()); + if (!CatalogUtil.isMatchedFunction(Arrays.asList(leftTypes), Arrays.asList(rightTypes))) { + throw new InvalidQueryException( + "The both schemas are not compatible"); + } + comparator = new SetTupleComparator(left.getSchema(), right.getSchema()); + this.isDistinct = isDistinct; + } + + @Override + public Tuple next() throws IOException { + if (!context.isStopped()) { + Tuple leftTuple = leftChild.next(); + Tuple rightTuple = rightChild.next(); + if (leftTuple == null || rightTuple == null) { + return null; + } + + // handling routine for INTERSECT without ALL + // it eliminates duplicated return of the same row values + if (isDistinct && lastReturned != null) { + while (comparator.compare(leftTuple, lastReturned) == 0) { + leftTuple = leftChild.next(); + if (leftTuple == null) + return null; + } + } + + // At this point, Both Tuples are not null + do { + int compVal = comparator.compare(leftTuple, rightTuple); + + if (compVal > 0) { + rightTuple = rightChild.next(); + } else if (compVal < 0) { + leftTuple = leftChild.next(); + } else { + lastReturned = leftTuple; + return leftTuple; + } + } while (leftTuple != null && rightTuple != null); + + return null; + } + return null; + } + + @Override + public void rescan() throws IOException { + super.rescan(); + + lastReturned = null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7481050e/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java new file mode 100644 index 0000000..3372651 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.SortNode; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSortIntersectExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortIntersectExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private LogicalOptimizer optimizer; + private Path testDir; + + private TableDesc employee1; + private TableDesc employee2; + + private int[] leftNum = new int[] {1, 2, 3, 3, 9, 9, 3, 0, 3}; + private int[] rightNum = new int[] {3, 7, 3, 5}; + private int[] answerAllNum = new int[] {3, 3}; // this should be set as leftNum intersect all rightNum + order by + private int[] answerDistinctNum = new int[] {3}; // this should be set as leftNum intersect rightNum + order by + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema employeeSchema1 = new Schema(); + employeeSchema1.addColumn("managerid", TajoDataTypes.Type.INT4); + employeeSchema1.addColumn("empid", TajoDataTypes.Type.INT4); + employeeSchema1.addColumn("memid", TajoDataTypes.Type.INT4); + employeeSchema1.addColumn("deptname", TajoDataTypes.Type.TEXT); + + TableMeta employeeMeta1 = CatalogUtil.newTableMeta("TEXT"); + Path employeePath1 = new Path(testDir, "employee1.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()). + getAppender(employeeMeta1, employeeSchema1, employeePath1); + appender.init(); + Tuple tuple = new VTuple(employeeSchema1.size()); + + for (int i : leftNum) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), // empid [0-8] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + employee1 = CatalogUtil.newTableDesc("default.employee1", employeeSchema1, employeeMeta1, employeePath1); + catalog.createTable(employee1); + + Schema employeeSchema2 = new Schema(); + employeeSchema2.addColumn("managerid", TajoDataTypes.Type.INT4); + employeeSchema2.addColumn("empid", TajoDataTypes.Type.INT4); + employeeSchema2.addColumn("memid", TajoDataTypes.Type.INT4); + employeeSchema2.addColumn("deptname", TajoDataTypes.Type.TEXT); + + TableMeta employeeMeta2 = CatalogUtil.newTableMeta("TEXT"); + Path employeePath2 = new Path(testDir, "employee2.csv"); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()). + getAppender(employeeMeta2, employeeSchema2, employeePath2); + appender2.init(); + Tuple tuple2 = new VTuple(employeeSchema2.size()); + + for (int i : rightNum) { + tuple2.put(new Datum[]{ + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), // empid [1-9] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i)}); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + employee2 = CatalogUtil.newTableDesc("default.employee2", employeeSchema2, employeeMeta2, employeePath2); + catalog.createTable(employee2); + + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + + // relation descriptions + // employee1 (managerid, empid, memid, deptname) + // employee2 (managerid, empid, memid, deptname) + + String[] QUERIES = { + "select * from employee1 as e1, employee2 as e2 where e1.empId = e2.empId" + }; + + @Test + public final void testSortIntersectAll() throws IOException, PlanningException { + FileFragment[] empFrags1 = ((FileTablespace) TablespaceManager.getLocalFs()). + splitNG(conf, "default.e1", employee1.getMeta(), + new Path(employee1.getUri()), Integer.MAX_VALUE); + FileFragment[] empFrags2 = ((FileTablespace) TablespaceManager.getLocalFs()) + .splitNG(conf, "default.e2", employee2.getMeta(), + new Path(employee2.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags1, empFrags2); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortIntersectAll"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + // replace an equal join with sort intersect all . + if (exec instanceof MergeJoinExec) { + MergeJoinExec join = (MergeJoinExec) exec; + exec = new SortIntersectExec(ctx, join.getLeftChild(), join.getRightChild(), false); + } else if (exec instanceof HashJoinExec) { + // we need to sort the results from both left and right children + HashJoinExec join = (HashJoinExec) exec; + SortSpec[] sortSpecsLeft = PlannerUtil.schemaToSortSpecs(join.getLeftChild().getSchema()); + SortSpec[] sortSpecsRight = PlannerUtil.schemaToSortSpecs(join.getRightChild().getSchema()); + + SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + leftSortNode.setSortSpecs(sortSpecsLeft); + leftSortNode.setInSchema(join.getLeftChild().getSchema()); + leftSortNode.setOutSchema(join.getLeftChild().getSchema()); + ExternalSortExec leftSort = new ExternalSortExec(ctx, leftSortNode, join.getLeftChild()); + + SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + rightSortNode.setSortSpecs(sortSpecsRight); + rightSortNode.setInSchema(join.getRightChild().getSchema()); + rightSortNode.setOutSchema(join.getRightChild().getSchema()); + ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild()); + + exec = new SortIntersectExec(ctx, leftSort, rightSort, false); + } + + Tuple tuple; + int count = 0; + int i = 0; + exec.init(); + + while ((tuple = exec.next()) != null) { + count++; + int answer = answerAllNum[i]; + assertTrue(answer == tuple.asDatum(0).asInt4()); + assertTrue(answer == tuple.asDatum(1).asInt4()); + assertTrue(10 + answer == tuple.asDatum(2).asInt4()); + assertTrue(("dept_" + answer).equals(tuple.asDatum(3).asChars())); + + i++; + } + exec.close(); + assertEquals(answerAllNum.length , count); + } + + @Test + public final void testSortIntersect() throws IOException, PlanningException { + FileFragment[] empFrags1 = ((FileTablespace) TablespaceManager.getLocalFs()) + .splitNG(conf, "default.e1", employee1.getMeta(), + new Path(employee1.getUri()), Integer.MAX_VALUE); + FileFragment[] empFrags2 = ((FileTablespace) TablespaceManager.getLocalFs()) + .splitNG(conf, "default.e2", employee2.getMeta(), + new Path(employee2.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags1, empFrags2); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortIntersect"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + // replace an equal join with sort intersect all . + if (exec instanceof MergeJoinExec) { + MergeJoinExec join = (MergeJoinExec) exec; + exec = new SortIntersectExec(ctx, join.getLeftChild(), join.getRightChild(), true); + } else if (exec instanceof HashJoinExec) { + // we need to sort the results from both left and right children + HashJoinExec join = (HashJoinExec) exec; + SortSpec[] sortSpecsLeft = PlannerUtil.schemaToSortSpecs(join.getLeftChild().getSchema()); + SortSpec[] sortSpecsRight = PlannerUtil.schemaToSortSpecs(join.getRightChild().getSchema()); + + SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + leftSortNode.setSortSpecs(sortSpecsLeft); + leftSortNode.setInSchema(join.getLeftChild().getSchema()); + leftSortNode.setOutSchema(join.getLeftChild().getSchema()); + ExternalSortExec leftSort = new ExternalSortExec(ctx, leftSortNode, join.getLeftChild()); + + SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + rightSortNode.setSortSpecs(sortSpecsRight); + rightSortNode.setInSchema(join.getRightChild().getSchema()); + rightSortNode.setOutSchema(join.getRightChild().getSchema()); + ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild()); + + exec = new SortIntersectExec(ctx, leftSort, rightSort, true); + } + + Tuple tuple; + int count = 0; + int i = 0; + exec.init(); + + while ((tuple = exec.next()) != null) { + count++; + int answer = answerDistinctNum[i]; + assertTrue(answer == tuple.asDatum(0).asInt4()); + assertTrue(answer == tuple.asDatum(1).asInt4()); + assertTrue(10 + answer == tuple.asDatum(2).asInt4()); + assertTrue(("dept_" + answer).equals(tuple.asDatum(3).asChars())); + + i++; + } + exec.close(); + assertEquals(answerDistinctNum.length , count); + } +}
