http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java new file mode 100644 index 0000000..ec2aa04 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileTablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.*; + +public class TestFullOuterMergeJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFullOuterMergeJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private QueryContext defaultContext; + + private TableDesc dep3; + private TableDesc dep4; + private TableDesc job3; + private TableDesc emp3; + private TableDesc phone3; + + private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3"); + private final String DEP4_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep4"); + private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3"); + private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3"); + private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3"); + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + + conf = util.getConfiguration(); + + //----------------- dep3 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + Schema dep3Schema = new Schema(); + dep3Schema.addColumn("dep_id", Type.INT4); + dep3Schema.addColumn("dep_name", Type.TEXT); + dep3Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT"); + Path dep3Path = new Path(testDir, "dep3.csv"); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); + appender1.init(); + VTuple tuple = new VTuple(dep3Schema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender1.addTuple(tuple); + } + + appender1.flush(); + appender1.close(); + dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path); + catalog.createTable(dep3); + + + //----------------- dep4 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + // 10 | dep_10 | 1010 + Schema dep4Schema = new Schema(); + dep4Schema.addColumn("dep_id", Type.INT4); + dep4Schema.addColumn("dep_name", Type.TEXT); + dep4Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep4Meta = CatalogUtil.newTableMeta("TEXT"); + Path dep4Path = new Path(testDir, "dep4.csv"); + Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path); + appender4.init(); + VTuple tuple4 = new VTuple(dep4Schema.size()); + for (int i = 0; i < 11; i++) { + tuple4.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender4.addTuple(tuple4); + } + + appender4.flush(); + appender4.close(); + dep4 = CatalogUtil.newTableDesc(DEP4_NAME, dep4Schema, dep4Meta, dep4Path); + catalog.createTable(dep4); + + + + //----------------- job3 ------------------------------ + // job_id | job_title + // ---------------------- + // 101 | job_101 + // 102 | job_102 + // 103 | job_103 + + Schema job3Schema = new Schema(); + job3Schema.addColumn("job_id", Type.INT4); + job3Schema.addColumn("job_title", Type.TEXT); + + + TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT"); + Path job3Path = new Path(testDir, "job3.csv"); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); + appender2.init(); + VTuple tuple2 = new VTuple(job3Schema.size()); + for (int i = 1; i < 4; i++) { + int x = 100 + i; + tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), + DatumFactory.createText("job_" + x) }); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path); + catalog.createTable(job3); + + + + //---------------------emp3 -------------------- + // emp_id | first_name | last_name | dep_id | salary | job_id + // ------------------------------------------------------------ + // 11 | fn_11 | ln_11 | 1 | 123 | 101 + // 13 | fn_13 | ln_13 | 3 | 369 | 103 + // 15 | fn_15 | ln_15 | 5 | 615 | null + // 17 | fn_17 | ln_17 | 7 | 861 | null + // 19 | fn_19 | ln_19 | 9 | 1107 | null + // 21 | fn_21 | ln_21 | 1 | 123 | 101 + // 23 | fn_23 | ln_23 | 3 | 369 | 103 + + Schema emp3Schema = new Schema(); + emp3Schema.addColumn("emp_id", Type.INT4); + emp3Schema.addColumn("first_name", Type.TEXT); + emp3Schema.addColumn("last_name", Type.TEXT); + emp3Schema.addColumn("dep_id", Type.INT4); + emp3Schema.addColumn("salary", Type.FLOAT4); + emp3Schema.addColumn("job_id", Type.INT4); + + + TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT"); + Path emp3Path = new Path(testDir, "emp3.csv"); + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); + appender3.init(); + VTuple tuple3 = new VTuple(emp3Schema.size()); + + for (int i = 1; i < 4; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + + int y = 20 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i), + DatumFactory.createText("firstname_" + y), + DatumFactory.createText("lastname_" + y), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + } + + for (int i = 5; i < 10; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createNullDatum() }); + appender3.addTuple(tuple3); + } + + appender3.flush(); + appender3.close(); + emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path); + catalog.createTable(emp3); + + //---------------------phone3 -------------------- + // emp_id | phone_number + // ----------------------------------------------- + // this table is empty, no rows + + Schema phone3Schema = new Schema(); + phone3Schema.addColumn("emp_id", Type.INT4); + phone3Schema.addColumn("phone_number", Type.TEXT); + + + TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT"); + Path phone3Path = new Path(testDir, "phone3.csv"); + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(phone3Meta, phone3Schema, phone3Path); + appender5.init(); + appender5.flush(); + appender5.close(); + phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path); + catalog.createTable(phone3); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + // [0] no nulls + "select dep3.dep_id, dep_name, emp_id, salary from emp3 full outer join dep3 on dep3.dep_id = emp3.dep_id", + // [1] nulls on the left operand + "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id", + // [2] nulls on the right side + "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id", + // [3] no nulls, right continues after left + "select dep4.dep_id, dep_name, emp_id, salary from emp3 full outer join dep4 on dep4.dep_id = emp3.dep_id", + // [4] one operand is empty + "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id", + // [5] one operand is empty + "select emp3.emp_id, first_name, phone_number from phone3 full outer join emp3 on emp3.emp_id = phone3.emp_id", + }; + + @Test + public final void testFullOuterMergeJoin0() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] dep3Frags = + FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testFullOuterMergeJoin0"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(12, count); + } + + + @Test + public final void testFullOuterMergeJoin1() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] job3Frags = + FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testFullOuterMergeJoin1"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + } + + @Test + public final void testFullOuterMergeJoin2() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[2]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] job3Frags = + FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testFullOuterMergeJoin2"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + } + + @Test + public final void testFullOuterMergeJoin3() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[3]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] dep4Frags = + FileTablespace.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testFullOuterMergeJoin3"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + + // if it chose the hash join WITH REVERSED ORDER, convert to merge join exec + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(13, count); + } + + + @Test + public final void testFullOuterMergeJoin4() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[4]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] phone3Frags = + FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testFullOuterMergeJoin4"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(7, count); + } + + + @Test + public final void testFullOuterMergeJoin5() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[5]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] phone3Frags = + FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testFullOuterMergeJoin5"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(7, count); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java new file mode 100644 index 0000000..b9ba2de --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHashAntiJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private LogicalOptimizer optimizer; + private Path testDir; + private QueryContext queryContext; + + private TableDesc employee; + private TableDesc people; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema employeeSchema = new Schema(); + employeeSchema.addColumn("managerid", Type.INT4); + employeeSchema.addColumn("empid", Type.INT4); + employeeSchema.addColumn("memid", Type.INT4); + employeeSchema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, employeeSchema, employeePath); + appender.init(); + VTuple tuple = new VTuple(employeeSchema.size()); + + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), // empid [0-9] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT"); + Path peoplePath = new Path(testDir, "people.csv"); + appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + for (int i = 1; i < 10; i += 2) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + + queryContext = new QueryContext(conf); + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf, catalog); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + + // relation descriptions + // employee (managerid, empid, memid, deptname) + // people (empid, fk_memid, name, age) + + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId" + }; + + @Test + public final void testHashAntiJoin() throws IOException, TajoException { + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashAntiJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + // replace an equal join with an hash anti join. + if (exec instanceof MergeJoinExec) { + MergeJoinExec join = (MergeJoinExec) exec; + ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild(); + ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild(); + SeqScanExec scanLeftChild = sortLeftChild.getChild(); + SeqScanExec scanRightChild = sortRightChild.getChild(); + + // 'people' should be outer table. So, the below code guarantees that people becomes the outer table. + if (scanLeftChild.getTableName().equals("default.people")) { + exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild); + } else { + exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild); + } + } else if (exec instanceof HashJoinExec) { + HashJoinExec join = (HashJoinExec) exec; + SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild(); + + // 'people' should be outer table. So, the below code guarantees that people becomes the outer table. + if (scanLeftChild.getTableName().equals("default.people")) { + exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild()); + } else { + exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild()); + } + } + + Tuple tuple; + int count = 0; + int i = 0; + exec.init(); + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.getInt4(0)); + assertTrue(i == tuple.getInt4(1)); // expected empid [0, 2, 4, 6, 8] + assertTrue(("dept_" + i).equals(tuple.getText(2))); + assertTrue(10 + i == tuple.getInt4(3)); + + i += 2; + } + exec.close(); + assertEquals(5 , count); // the expected result : [0, 2, 4, 6, 8] + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java new file mode 100644 index 0000000..4550db9 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.*; + +public class TestHashJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private QueryContext defaultContext; + + private TableDesc employee; + private TableDesc people; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema employeeSchema = new Schema(); + employeeSchema.addColumn("managerid", Type.INT4); + employeeSchema.addColumn("empid", Type.INT4); + employeeSchema.addColumn("memid", Type.INT4); + employeeSchema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, employeeSchema, employeePath); + appender.init(); + VTuple tuple = new VTuple(employeeSchema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT"); + Path peoplePath = new Path(testDir, "people.csv"); + appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + for (int i = 1; i < 10; i += 2) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e inner join " + + "people as p on e.empId = p.empId and e.memId = p.fk_memId" + }; + + @Test + public final void testHashInnerJoin() throws IOException, TajoException { + + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashInnerJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashJoinExec); + + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.getInt4(0)); + assertTrue(i == tuple.getInt4(1)); + assertTrue(("dept_" + i).equals(tuple.getText(2))); + assertTrue(10 + i == tuple.getInt4(3)); + + i += 2; + } + exec.close(); + assertEquals(10 / 2, count); + } + + @Test + public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashInnerJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l); + PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashJoinExec); + HashJoinExec joinExec = proj.getChild(); + + assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec); + } + + /** + * It checks inner-join related functions. It will return TRUE if left relations is smaller than right relations. + * + * The below unit tests will work according to which side is smaller. In this unit tests, we use two tables: p and e. + * The table p is 75 bytes, and the table e is 140 bytes. Since we cannot expect that which side is smaller, + * we use some boolean variable <code>leftSmaller</code> to indicate which side is small. + */ + private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx, + PhysicalPlannerImpl phyPlanner, + JoinNode joinNode, BinaryPhysicalExec joinExec) throws + IOException { + + String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild()); + String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild()); + + boolean leftSmaller; + if (left[0].equals("default.p")) { + leftSmaller = true; + } else { + leftSmaller = false; + } + + long leftSize = phyPlanner.estimateSizeRecursive(ctx, left); + long rightSize = phyPlanner.estimateSizeRecursive(ctx, right); + + // The table p is 75 bytes, and the table e is 140 bytes. + if (leftSmaller) { // if left one is smaller + assertEquals(75, leftSize); + assertEquals(140, rightSize); + } else { // if right one is smaller + assertEquals(140, leftSize); + assertEquals(75, rightSize); + } + + if (leftSmaller) { + PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(), + joinExec.getRightChild()); + assertEquals(ordered[0], joinExec.getLeftChild()); + assertEquals(ordered[1], joinExec.getRightChild()); + + assertEquals("default.p", left[0]); + assertEquals("default.e", right[0]); + } else { + PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(), + joinExec.getRightChild()); + assertEquals(ordered[1], joinExec.getLeftChild()); + assertEquals(ordered[0], joinExec.getRightChild()); + + assertEquals("default.e", left[0]); + assertEquals("default.p", right[0]); + } + + if (leftSmaller) { + assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true)); + assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false)); + } else { + assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true)); + assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false)); + } + + return leftSmaller; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java new file mode 100644 index 0000000..1146e85 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHashPartitioner { + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testGetPartition() { + VTuple tuple1 = new VTuple(3); + tuple1.put(new Datum[] { + DatumFactory.createInt4(1), + DatumFactory.createInt4(2), + DatumFactory.createInt4(3) + }); + VTuple tuple2 = new VTuple(3); + tuple2.put(new Datum[] { + DatumFactory.createInt4(1), + DatumFactory.createInt4(2), + DatumFactory.createInt4(4) + }); + VTuple tuple3 = new VTuple(3); + tuple3.put(new Datum[] { + DatumFactory.createInt4(1), + DatumFactory.createInt4(2), + DatumFactory.createInt4(5) + }); + VTuple tuple4 = new VTuple(3); + tuple4.put(new Datum[] { + DatumFactory.createInt4(2), + DatumFactory.createInt4(2), + DatumFactory.createInt4(3) + }); + VTuple tuple5 = new VTuple(3); + tuple5.put(new Datum[] { + DatumFactory.createInt4(2), + DatumFactory.createInt4(2), + DatumFactory.createInt4(4) + }); + + int [] partKeys = {0,1}; + Partitioner p = new HashPartitioner(partKeys, 2); + + int part1 = p.getPartition(tuple1); + assertEquals(part1, p.getPartition(tuple2)); + assertEquals(part1, p.getPartition(tuple3)); + + int part2 = p.getPartition(tuple4); + assertEquals(part2, p.getPartition(tuple5)); + } + + @Test + public final void testGetPartition2() { + // https://issues.apache.org/jira/browse/TAJO-976 + Random rand = new Random(); + String[][] data = new String[1000][]; + + for (int i = 0; i < 1000; i++) { + data[i] = new String[]{ String.valueOf(rand.nextInt(1000)), String.valueOf(rand.nextInt(1000)), String.valueOf(rand.nextInt(1000))}; + } + + int[] testNumPartitions = new int[]{31, 62, 124, 32, 63, 125}; + for (int index = 0; index < testNumPartitions.length; index++) { + Partitioner p = new HashPartitioner(new int[]{0, 1, 2}, testNumPartitions[index]); + + Set<Integer> ids = new TreeSet<Integer>(); + for (int i = 0; i < data.length; i++) { + Tuple tuple = new VTuple( + new Datum[]{new TextDatum(data[i][0]), new TextDatum(data[i][1]), new TextDatum(data[i][2])}); + + ids.add(p.getPartition(tuple)); + } + + // The number of partitions isn't exactly matched. + assertTrue(ids.size() + 5 >= testNumPartitions[index]); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java new file mode 100644 index 0000000..0d996de --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHashSemiJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private LogicalOptimizer optimizer; + private Path testDir; + private QueryContext queryContext; + + private TableDesc employee; + private TableDesc people; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema employeeSchema = new Schema(); + employeeSchema.addColumn("managerid", Type.INT4); + employeeSchema.addColumn("empid", Type.INT4); + employeeSchema.addColumn("memid", Type.INT4); + employeeSchema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, employeeSchema, employeePath); + appender.init(); + VTuple tuple = new VTuple(employeeSchema.size()); + + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), // empid [0-9] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT"); + Path peoplePath = new Path(testDir, "people.csv"); + appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + // make 27 tuples + for (int i = 1; i < 10; i += 2) { + // make three duplicated tuples for each tuples + for (int j = 0; j < 3; j++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + } + + appender.flush(); + appender.close(); + + queryContext = new QueryContext(conf); + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf, catalog); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + + // relation descriptions + // employee (managerid, empid, memid, deptname) + // people (empid, fk_memid, name, age) + + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId" + }; + + @Test + public final void testHashSemiJoin() throws IOException, TajoException { + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashSemiJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + // replace an equal join with an hash anti join. + if (exec instanceof MergeJoinExec) { + MergeJoinExec join = (MergeJoinExec) exec; + ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild(); + ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild(); + SeqScanExec scanLeftChild = sortLeftChild.getChild(); + SeqScanExec scanRightChild = sortRightChild.getChild(); + + // 'people' should be outer table. So, the below code guarantees that people becomes the outer table. + if (scanLeftChild.getTableName().equals("default.people")) { + exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild); + } else { + exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild); + } + } else if (exec instanceof HashJoinExec) { + HashJoinExec join = (HashJoinExec) exec; + SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild(); + + // 'people' should be outer table. So, the below code guarantees that people becomes the outer table. + if (scanLeftChild.getTableName().equals("default.people")) { + exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild()); + } else { + exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild()); + } + } + + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + // expect result without duplicated tuples. + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.getInt4(0)); + assertTrue(i == tuple.getInt4(1)); + assertTrue(("dept_" + i).equals(tuple.getText(2))); + assertTrue(10 + i == tuple.getInt4(3)); + + i += 2; + } + exec.close(); + assertEquals(5 , count); // the expected result: [1, 3, 5, 7, 9] + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java new file mode 100644 index 0000000..76e38e4 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestLeftOuterHashJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuterHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private QueryContext defaultContext; + + private TableDesc dep3; + private TableDesc job3; + private TableDesc emp3; + private TableDesc phone3; + + private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3"); + private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3"); + private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3"); + private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3"); + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + //----------------- dep3 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + Schema dep3Schema = new Schema(); + dep3Schema.addColumn("dep_id", Type.INT4); + dep3Schema.addColumn("dep_name", Type.TEXT); + dep3Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT"); + Path dep3Path = new Path(testDir, "dep3.csv"); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); + appender1.init(); + VTuple tuple = new VTuple(dep3Schema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender1.addTuple(tuple); + } + + appender1.flush(); + appender1.close(); + dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path); + catalog.createTable(dep3); + + //----------------- job3 ------------------------------ + // job_id | job_title + // ---------------------- + // 101 | job_101 + // 102 | job_102 + // 103 | job_103 + + Schema job3Schema = new Schema(); + job3Schema.addColumn("job_id", Type.INT4); + job3Schema.addColumn("job_title", Type.TEXT); + + + TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT"); + Path job3Path = new Path(testDir, "job3.csv"); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(job3Meta, job3Schema, job3Path); + appender2.init(); + VTuple tuple2 = new VTuple(job3Schema.size()); + for (int i = 1; i < 4; i++) { + int x = 100 + i; + tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), + DatumFactory.createText("job_" + x) }); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path); + catalog.createTable(job3); + + + + //---------------------emp3 -------------------- + // emp_id | first_name | last_name | dep_id | salary | job_id + // ------------------------------------------------------------ + // 11 | fn_11 | ln_11 | 1 | 123 | 101 + // 13 | fn_13 | ln_13 | 3 | 369 | 103 + // 15 | fn_15 | ln_15 | 5 | 615 | null + // 17 | fn_17 | ln_17 | 7 | 861 | null + // 19 | fn_19 | ln_19 | 9 | 1107 | null + // 21 | fn_21 | ln_21 | 1 | 123 | 101 + // 23 | fn_23 | ln_23 | 3 | 369 | 103 + + Schema emp3Schema = new Schema(); + emp3Schema.addColumn("emp_id", Type.INT4); + emp3Schema.addColumn("first_name", Type.TEXT); + emp3Schema.addColumn("last_name", Type.TEXT); + emp3Schema.addColumn("dep_id", Type.INT4); + emp3Schema.addColumn("salary", Type.FLOAT4); + emp3Schema.addColumn("job_id", Type.INT4); + + + TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT"); + Path emp3Path = new Path(testDir, "emp3.csv"); + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(emp3Meta, emp3Schema, emp3Path); + appender3.init(); + VTuple tuple3 = new VTuple(emp3Schema.size()); + + for (int i = 1; i < 4; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + + int y = 20 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i), + DatumFactory.createText("firstname_" + y), + DatumFactory.createText("lastname_" + y), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + } + + for (int i = 5; i < 10; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createNullDatum() }); + appender3.addTuple(tuple3); + } + + appender3.flush(); + appender3.close(); + emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path); + catalog.createTable(emp3); + + //---------------------phone3 -------------------- + // emp_id | phone_number + // ----------------------------------------------- + // this table is empty, no rows + + Schema phone3Schema = new Schema(); + phone3Schema.addColumn("emp_id", Type.INT4); + phone3Schema.addColumn("phone_number", Type.TEXT); + + + TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT"); + Path phone3Path = new Path(testDir, "phone3.csv"); + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(phone3Meta, phone3Schema, phone3Path); + appender5.init(); + + appender5.flush(); + appender5.close(); + phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path); + catalog.createTable(phone3); + + + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + // [0] no nulls + "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id", + // [1] nulls on the right operand + "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id", + // [2] nulls on the left side + "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id", + // [3] one operand is empty + "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id", + // [4] one operand is empty + "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id" + }; + + @Test + public final void testLeftOuterHashJoinExec0() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), + new Path(dep3.getUri()), Integer.MAX_VALUE); + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), + new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuterHashJoinExec0"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec); + + int count = 0; + exec.init(); + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(12, count); + } + + + @Test + public final void testLeftOuter_HashJoinExec1() throws IOException, TajoException { + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), + new Path(job3.getUri()), Integer.MAX_VALUE); + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), + new Path(emp3.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_HashJoinExec1"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + + while ((tuple = exec.next()) != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(5, count); + } + + @Test + public final void testLeftOuter_HashJoinExec2() throws IOException, TajoException { + + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), + new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), + new Path(job3.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + + "/TestLeftOuter_HashJoinExec2"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[2]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + + while ((tuple = exec.next()) != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(7, count); + } + + + @Test + public final void testLeftOuter_HashJoinExec3() throws IOException, TajoException { + + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), + new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), + new Path(phone3.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_HashJoinExec3"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[3]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + + while ((tuple = exec.next()) != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(7, count); + } + + + @Test + public final void testLeftOuter_HashJoinExec4() throws IOException, TajoException { + + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, "default.emp3", emp3.getMeta(), + new Path(emp3.getUri()), Integer.MAX_VALUE); + FileFragment[] phone3Frags = FileTablespace.splitNG(conf, "default.phone3", phone3.getMeta(), + new Path(phone3.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestLeftOuter_HashJoinExec4"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[4]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(0, count); + } + + + +} + //--camelia http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java new file mode 100644 index 0000000..2c26cc2 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestMergeJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestMergeJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + + private TableDesc employee; + private TableDesc people; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema employeeSchema = new Schema(); + employeeSchema.addColumn("managerid", Type.INT4); + employeeSchema.addColumn("empid", Type.INT4); + employeeSchema.addColumn("memid", Type.INT4); + employeeSchema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, employeeSchema, employeePath); + appender.init(); + VTuple tuple = new VTuple(employeeSchema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + for (int i = 11; i < 20; i+=2) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT"); + Path peoplePath = new Path(testDir, "people.csv"); + appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + for (int i = 1; i < 10; i += 2) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + for (int i = 10; i < 20; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e inner join " + + "people as p on e.empId = p.empId and e.memId = p.fk_memId" + }; + + @Test + public final void testMergeInnerJoin() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + LogicalNode root = plan.getRootBlock().getRoot(); + + JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getUri()), + Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), new Path(people.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testMergeInnerJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, root); + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeJoinExec); + + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.getInt4(0)); + assertTrue(i == tuple.getInt4(1)); + assertTrue(("dept_" + i).equals(tuple.getText(2))); + assertTrue((10 + i) == tuple.getInt4(3)); + + i += 2; + } + exec.close(); + assertEquals(10, count); // expected 10 * 5 + } +}
