http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java new file mode 100644 index 0000000..d7968fe --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestNLJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestNLJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + + private TableDesc employee; + private TableDesc people; + + private MasterPlan masterPlan; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema schema = new Schema(); + schema.addColumn("managerid", Type.INT4); + schema.addColumn("empid", Type.INT4); + schema.addColumn("memid", Type.INT4); + schema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, schema, employeePath); + appender.init(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < 50; i++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i)}); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT"); + Path peoplePath = new Path(testDir, "people.csv"); + appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + for (int i = 1; i < 50; i += 2) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i)}); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + + masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e, people p", + "select managerId, e.empId, deptName, e.memId from employee as e inner join people as p on " + + "e.empId = p.empId and e.memId = p.fk_memId" + }; + + @Test + public final void testNLCrossJoin() throws IOException, TajoException { + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testNLCrossJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), + expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + int i = 0; + exec.init(); + while (exec.next() != null) { + i++; + } + exec.close(); + assertEquals(50*50/2, i); // expected 10 * 5 + } + + @Test + public final void testNLInnerJoin() throws IOException, TajoException { + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testNLInnerJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), + expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + Tuple tuple; + int i = 1; + int count = 0; + exec.init(); + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.getInt4(0)); + assertTrue(i == tuple.getInt4(1)); + assertTrue(("dept_" + i).equals(tuple.getText(2))); + assertTrue(10 + i == tuple.getInt4(3)); + i += 2; + } + exec.close(); + assertEquals(50 / 2, count); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java new file mode 100644 index 0000000..69b36c5 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -0,0 +1,1144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.function.FunctionLoader; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.global.DataChannel; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.*; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.expr.AggregationFunctionCallEval; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto.ShuffleType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.session.Session; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.SortEnforce.SortAlgorithm; +import static org.junit.Assert.*; + +public class TestPhysicalPlanner { + private static TajoTestingCluster util; + private static TajoConf conf; + private static CatalogService catalog; + private static SQLAnalyzer analyzer; + private static LogicalPlanner planner; + private static LogicalOptimizer optimizer; + private static FileTablespace sm; + private static Path testDir; + private static Session session = LocalTajoTestingUtility.createDummySession(); + private static QueryContext defaultContext; + + private static TableDesc employee = null; + private static TableDesc score = null; + private static TableDesc largeScore = null; + + private static MasterPlan masterPlan; + + @BeforeClass + public static void setUp() throws Exception { + util = new TajoTestingCluster(); + + util.startCatalogCluster(); + conf = util.getConfiguration(); + testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner"); + sm = TablespaceManager.getLocalFs(); + catalog = util.getMiniCatalogCluster().getCatalog(); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + for (FunctionDesc funcDesc : FunctionLoader.findLegacyFunctions()) { + catalog.createFunction(funcDesc); + } + + Schema employeeSchema = new Schema(); + employeeSchema.addColumn("name", Type.TEXT); + employeeSchema.addColumn("empid", Type.INT4); + employeeSchema.addColumn("deptname", Type.TEXT); + + Schema scoreSchema = new Schema(); + scoreSchema.addColumn("deptname", Type.TEXT); + scoreSchema.addColumn("class", Type.TEXT); + scoreSchema.addColumn("score", Type.INT4); + scoreSchema.addColumn("nullable", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + + + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = sm.getAppender(employeeMeta, employeeSchema, employeePath); + appender.init(); + VTuple tuple = new VTuple(employeeSchema.size()); + for (int i = 0; i < 100; i++) { + tuple.put(new Datum[] {DatumFactory.createText("name_" + i), + DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i)}); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + employee = new TableDesc( + CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), employeeSchema, employeeMeta, + employeePath.toUri()); + catalog.createTable(employee); + + Path scorePath = new Path(testDir, "score"); + TableMeta scoreMeta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); + appender = sm.getAppender(scoreMeta, scoreSchema, scorePath); + appender.init(); + score = new TableDesc( + CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta, + scorePath.toUri()); + tuple = new VTuple(scoreSchema.size()); + int m = 0; + for (int i = 1; i <= 5; i++) { + for (int k = 3; k < 5; k++) { + for (int j = 1; j <= 3; j++) { + tuple.put( + new Datum[] { + DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5) + DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2) + DatumFactory.createInt4(j), // 1 ~ 3 + m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()}); + appender.addTuple(tuple); + m++; + } + } + } + appender.flush(); + appender.close(); + + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + catalog.createTable(score); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf, catalog); + masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); + + createLargeScoreTable(); + } + + public static void createLargeScoreTable() throws IOException, TajoException { + + // Preparing a large table + Path scoreLargePath = new Path(testDir, "score_large"); + CommonTestingUtil.cleanupTestDir(scoreLargePath.toString()); + + Schema scoreSchmea = score.getSchema(); + TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet()); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath); + appender.enableStats(); + appender.init(); + largeScore = new TableDesc( + CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score_large"), scoreSchmea, scoreLargeMeta, + scoreLargePath.toUri()); + + VTuple tuple = new VTuple(scoreSchmea.size()); + int m = 0; + for (int i = 1; i <= 40000; i++) { + for (int k = 3; k < 5; k++) { // |{3,4}| = 2 + for (int j = 1; j <= 3; j++) { // |{1,2,3}| = 3 + tuple.put( + new Datum[] { + DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5) + DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2) + DatumFactory.createInt4(j), // 1 ~ 3 + m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()}); + appender.addTuple(tuple); + m++; + } + } + } + appender.flush(); + appender.close(); + largeScore.setStats(appender.getStats()); + catalog.createTable(largeScore); + } + + @AfterClass + public static void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + private String[] QUERIES = { + "select name, empId, deptName from employee", // 0 + "select name, empId, e.deptName, manager from employee as e, dept as dp", // 1 + "select name, empId, e.deptName, manager, score from employee as e, dept, score", // 2 + "select p.deptName, sum(score) from dept as p, score group by p.deptName having sum(score) > 30", // 3 + "select p.deptName, score from dept as p, score order by score asc", // 4 + "select name from employee where empId = 100", // 5 + "select deptName, class, score from score", // 6 + "select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 7 + "select count(*), max(score), min(score) from score", // 8 + "select count(deptName) from score", // 9 + "select managerId, empId, deptName from employee order by managerId, empId desc", // 10 + "select deptName, nullable from score group by deptName, nullable", // 11 + "select 3 < 4 as ineq, 3.5 * 2 as score", // 12 + "select (1 > 0) and 3 > 1", // 13 + "select sum(score), max(score), min(score) from score", // 14 + "select deptname, sum(score), max(score), min(score) from score group by deptname", // 15 + "select name from employee where empid >= 0", // 16 + }; + + @Test + public final void testCreateScanPlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanPlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + LogicalNode rootNode =plan.getRootBlock().getRoot(); + optimizer.optimize(plan); + + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + Tuple tuple; + int i = 0; + exec.init(); + while ((tuple = exec.next()) != null) { + assertTrue(tuple.contains(0)); + assertTrue(tuple.contains(1)); + assertTrue(tuple.contains(2)); + i++; + } + exec.close(); + assertEquals(100, i); + } + + @Test + public final void testCreateScanWithFilterPlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanWithFilterPlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[16]); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + LogicalNode rootNode =plan.getRootBlock().getRoot(); + optimizer.optimize(plan); + + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + Tuple tuple; + int i = 0; + exec.init(); + while ((tuple = exec.next()) != null) { + assertTrue(tuple.contains(0)); + i++; + } + exec.close(); + assertEquals(100, i); + } + + @Test + public final void testGroupByPlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByPlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[7]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + int i = 0; + Tuple tuple; + exec.init(); + while ((tuple = exec.next()) != null) { + assertEquals(6, tuple.getInt4(2)); // sum + assertEquals(3, tuple.getInt4(3)); // max + assertEquals(1, tuple.getInt4(4)); // min + i++; + } + exec.close(); + assertEquals(10, i); + } + + @Test + public final void testHashGroupByPlanWithALLField() throws IOException, TajoException { + // TODO - currently, this query does not use hash-based group operator. + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + + "/testHashGroupByPlanWithALLField"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[15]); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + int i = 0; + Tuple tuple; + exec.init(); + while ((tuple = exec.next()) != null) { + assertEquals(12, tuple.getInt4(1)); // sum + assertEquals(3, tuple.getInt4(2)); // max + assertEquals(1, tuple.getInt4(3)); // min + i++; + } + exec.close(); + assertEquals(5, i); + } + + @Test + public final void testSortGroupByPlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortGroupByPlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[]{frags[0]}, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[7]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot()); + + /*HashAggregateExec hashAgg = (HashAggregateExec) exec; + + SeqScanExec scan = (SeqScanExec) hashAgg.getSubOp(); + + Column [] grpColumns = hashAgg.getAnnotation().getGroupingColumns(); + QueryBlock.SortSpec [] specs = new QueryBlock.SortSpec[grpColumns.length]; + for (int i = 0; i < grpColumns.length; i++) { + specs[i] = new QueryBlock.SortSpec(grpColumns[i], true, false); + } + SortNode annotation = new SortNode(specs); + annotation.setInSchema(scan.getSchema()); + annotation.setOutSchema(scan.getSchema()); + SortExec sort = new SortExec(annotation, scan); + exec = new SortAggregateExec(hashAgg.getAnnotation(), sort);*/ + + int i = 0; + Tuple tuple; + exec.init(); + while ((tuple = exec.next()) != null) { + assertEquals(6, tuple.getInt4(2)); // sum + assertEquals(3, tuple.getInt4(3)); // max + assertEquals(1, tuple.getInt4(4)); // min + i++; + } + assertEquals(10, i); + + exec.rescan(); + i = 0; + while ((tuple = exec.next()) != null) { + assertEquals(6, tuple.getInt4(2)); // sum + assertEquals(3, tuple.getInt4(3)); // max + assertEquals(1, tuple.getInt4(4)); // min + i++; + } + exec.close(); + assertEquals(10, i); + } + + private String[] CreateTableAsStmts = { + "create table grouped1 as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 0 + "create table grouped2 using rcfile as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 1 + "create table grouped3 partition by column (dept text, class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2, + "create table score_large_output as select * from score_large", // 4 + "CREATE TABLE score_part (deptname text, score int4, nullable text) PARTITION BY COLUMN (class text) " + + "AS SELECT deptname, score, nullable, class from score_large" // 5 + }; + + @Test + public final void testStorePlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + ctx.setOutputPath(new Path(workDir, "grouped1")); + + Expr context = analyzer.parse(CreateTableAsStmts[0]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + TableMeta outputMeta = CatalogUtil.newTableMeta("TEXT"); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()) + .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); + scanner.init(); + Tuple tuple; + int i = 0; + while ((tuple = scanner.next()) != null) { + assertEquals(6, tuple.getInt4(2)); // sum + assertEquals(3, tuple.getInt4(3)); // max + assertEquals(1, tuple.getInt4(4)); // min + i++; + } + assertEquals(10, i); + scanner.close(); + + // Examine the statistics information + assertEquals(10, ctx.getResultStats().getNumRows().longValue()); + } + + @Test + public final void testStorePlanWithMaxOutputFileSize() throws IOException, TajoException, + CloneNotSupportedException { + + TableStats stats = largeScore.getStats(); + assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB); + + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(), + new Path(largeScore.getUri()), Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithMaxOutputFileSize"); + + QueryContext queryContext = new QueryContext(conf, session); + queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1); + + TaskAttemptContext ctx = new TaskAttemptContext( + queryContext, + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + ctx.setOutputPath(new Path(workDir, "maxOutput")); + + Expr context = analyzer.parse(CreateTableAsStmts[3]); + + LogicalPlan plan = planner.createPlan(queryContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + // executing StoreTableExec + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + // checking the number of punctuated files + int expectedFileNum = (int) (stats.getNumBytes() / (float) StorageUnit.MB); + FileSystem fs = ctx.getOutputPath().getFileSystem(conf); + FileStatus [] statuses = fs.listStatus(ctx.getOutputPath().getParent()); + assertEquals(expectedFileNum, statuses.length); + + // checking the file contents + long totalNum = 0; + for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) { + Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner( + CatalogUtil.newTableMeta("TEXT"), + rootNode.getOutSchema(), + status.getPath()); + + scanner.init(); + while ((scanner.next()) != null) { + totalNum++; + } + scanner.close(); + } + assertTrue(totalNum == ctx.getResultStats().getNumRows()); + } + + @Test + public final void testStorePlanWithRCFile() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithRCFile"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + ctx.setOutputPath(new Path(workDir, "grouped2")); + + Expr context = analyzer.parse(CreateTableAsStmts[1]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + TableMeta outputMeta = CatalogUtil.newTableMeta("RCFILE"); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner( + outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); + scanner.init(); + Tuple tuple; + int i = 0; + while ((tuple = scanner.next()) != null) { + assertEquals(6, tuple.getInt4(2)); // sum + assertEquals(3, tuple.getInt4(3)); // max + assertEquals(1, tuple.getInt4(4)); // min + i++; + } + assertEquals(10, i); + scanner.close(); + + // Examine the statistics information + assertEquals(10, ctx.getResultStats().getNumRows().longValue()); + } + + @Test + public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + ctx.setOutputPath(new Path(workDir, "grouped3")); + + Expr context = analyzer.parse(CreateTableAsStmts[2]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + assertTrue(exec instanceof SortBasedColPartitionStoreExec); + } + + @Test + public final void testEnforceForHashBasedColumnPartitionStorePlan() throws IOException, TajoException { + + Expr context = analyzer.parse(CreateTableAsStmts[2]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalRootNode rootNode = (LogicalRootNode) optimizer.optimize(plan); + CreateTableNode createTableNode = rootNode.getChild(); + Enforcer enforcer = new Enforcer(); + enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION); + + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(enforcer); + ctx.setOutputPath(new Path(workDir, "grouped4")); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + assertTrue(exec instanceof HashBasedColPartitionStoreExec); + } + + @Test + public final void testEnforceForSortBasedColumnPartitionStorePlan() throws IOException, TajoException { + + Expr context = analyzer.parse(CreateTableAsStmts[2]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalRootNode rootNode = (LogicalRootNode) optimizer.optimize(plan); + CreateTableNode createTableNode = rootNode.getChild(); + Enforcer enforcer = new Enforcer(); + enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION); + + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(enforcer); + ctx.setOutputPath(new Path(workDir, "grouped5")); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + assertTrue(exec instanceof SortBasedColPartitionStoreExec); + } + + @Test + public final void testPartitionedStorePlan() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] }, + CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testPartitionedStorePlan")); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[7]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + + int numPartitions = 3; + Column key1 = new Column("default.score.deptname", Type.TEXT); + Column key2 = new Column("default.score.class", Type.TEXT); + DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(), + ShuffleType.HASH_SHUFFLE, numPartitions); + dataChannel.setShuffleKeys(new Column[]{key1, key2}); + ctx.setDataChannel(dataChannel); + LogicalNode rootNode = optimizer.optimize(plan); + + TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType()); + + FileSystem fs = sm.getFileSystem(); + QueryId queryId = id.getTaskId().getExecutionBlockId().getQueryId(); + ExecutionBlockId ebId = id.getTaskId().getExecutionBlockId(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + ctx.getHashShuffleAppenderManager().close(ebId); + + String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); + FileStatus [] list = fs.listStatus(queryLocalTmpDir); + + List<Fragment> fragments = new ArrayList<Fragment>(); + for (FileStatus status : list) { + assertTrue(status.isDirectory()); + FileStatus [] files = fs.listStatus(status.getPath()); + for (FileStatus eachFile: files) { + fragments.add(new FileFragment("partition", eachFile.getPath(), 0, eachFile.getLen())); + } + } + + assertEquals(numPartitions, fragments.size()); + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); + scanner.init(); + + Tuple tuple; + int i = 0; + while ((tuple = scanner.next()) != null) { + assertEquals(6, tuple.getInt4(2)); // sum + assertEquals(3, tuple.getInt4(3)); // max + assertEquals(1, tuple.getInt4(4)); // min + i++; + } + assertEquals(10, i); + scanner.close(); + + // Examine the statistics information + assertEquals(10, ctx.getResultStats().getNumRows().longValue()); + + fs.delete(queryLocalTmpDir, true); + } + + @Test + public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, TajoException { + + // Preparing working dir and input fragments + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(), + new Path(largeScore.getUri()), Integer.MAX_VALUE); + TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testPartitionedStorePlanWithMaxFileSize"); + + // Setting session variables + QueryContext queryContext = new QueryContext(conf, session); + queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1); + + // Preparing task context + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, id, new FileFragment[] { frags[0] }, workDir); + ctx.setOutputPath(new Path(workDir, "part-01-000000")); + // SortBasedColumnPartitionStoreExec will be chosen by default. + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(CreateTableAsStmts[4]); + LogicalPlan plan = planner.createPlan(queryContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + // Executing CREATE TABLE PARTITION BY + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + FileSystem fs = sm.getFileSystem(); + FileStatus [] list = fs.listStatus(workDir); + // checking the number of partitions + assertEquals(2, list.length); + + List<Fragment> fragments = Lists.newArrayList(); + int i = 0; + for (FileStatus status : list) { + assertTrue(status.isDirectory()); + + long fileVolumSum = 0; + FileStatus [] fileStatuses = fs.listStatus(status.getPath()); + for (FileStatus fileStatus : fileStatuses) { + fileVolumSum += fileStatus.getLen(); + fragments.add(new FileFragment("partition", fileStatus.getPath(), 0, fileStatus.getLen())); + } + assertTrue("checking the meaningfulness of test", fileVolumSum > StorageUnit.MB && fileStatuses.length > 1); + + long expectedFileNum = (long) Math.ceil(fileVolumSum / (float)StorageUnit.MB); + assertEquals(expectedFileNum, fileStatuses.length); + } + TableMeta outputMeta = CatalogUtil.newTableMeta("TEXT"); + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); + scanner.init(); + + long rowNum = 0; + while (scanner.next() != null) { + rowNum++; + } + + // checking the number of all written rows + assertTrue(largeScore.getStats().getNumRows() == rowNum); + + scanner.close(); + } + + @Test + public final void testPartitionedStorePlanWithEmptyGroupingSet() + throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + + "/testPartitionedStorePlanWithEmptyGroupingSet"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + id, new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[14]); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + int numPartitions = 1; + DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(), + ShuffleType.HASH_SHUFFLE, numPartitions); + dataChannel.setShuffleKeys(new Column[]{}); + ctx.setDataChannel(dataChannel); + optimizer.optimize(plan); + + TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType()); + + FileSystem fs = sm.getFileSystem(); + QueryId queryId = id.getTaskId().getExecutionBlockId().getQueryId(); + ExecutionBlockId ebId = id.getTaskId().getExecutionBlockId(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + ctx.getHashShuffleAppenderManager().close(ebId); + + String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); + FileStatus [] list = fs.listStatus(queryLocalTmpDir); + + List<Fragment> fragments = new ArrayList<Fragment>(); + for (FileStatus status : list) { + assertTrue(status.isDirectory()); + FileStatus [] files = fs.listStatus(status.getPath()); + for (FileStatus eachFile: files) { + fragments.add(new FileFragment("partition", eachFile.getPath(), 0, eachFile.getLen())); + } + } + + assertEquals(numPartitions, fragments.size()); + + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); + scanner.init(); + Tuple tuple; + int i = 0; + while ((tuple = scanner.next()) != null) { + assertEquals(60, tuple.getInt4(0)); // sum + assertEquals(3, tuple.getInt4(1)); // max + assertEquals(1, tuple.getInt4(2)); // min + i++; + } + assertEquals(1, i); + scanner.close(); + + // Examine the statistics information + assertEquals(1, ctx.getResultStats().getNumRows().longValue()); + fs.delete(queryLocalTmpDir, true); + } + + @Test + public final void testAggregationFunction() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testAggregationFunction"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[8]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + // Set all aggregation functions to the first phase mode + GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY); + for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) { + function.setFirstPhase(); + } + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + exec.init(); + Tuple tuple = exec.next(); + assertEquals(30, tuple.getInt8(0)); + assertEquals(3, tuple.getInt4(1)); + assertEquals(1, tuple.getInt4(2)); + assertNull(exec.next()); + exec.close(); + } + + @Test + public final void testCountFunction() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCountFunction"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[9]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + // Set all aggregation functions to the first phase mode + GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY); + for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) { + function.setFirstPhase(); + } + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + Tuple tuple = exec.next(); + assertEquals(30, tuple.getInt8(0)); + assertNull(exec.next()); + exec.close(); + } + + @Test + public final void testGroupByWithNullValue() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByWithNullValue"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[11]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + int count = 0; + exec.init(); + while(exec.next() != null) { + count++; + } + exec.close(); + assertEquals(10, count); + } + + @Test + public final void testUnionPlan() throws IOException, TajoException, CloneNotSupportedException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testUnionPlan"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + LogicalRootNode root = (LogicalRootNode) rootNode; + UnionNode union = plan.createNode(UnionNode.class); + union.setLeftChild((LogicalNode) root.getChild().clone()); + union.setRightChild((LogicalNode) root.getChild().clone()); + root.setChild(union); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, root); + + int count = 0; + exec.init(); + while(exec.next() != null) { + count++; + } + exec.close(); + assertEquals(200, count); + } + + @Test + public final void testEvalExpr() throws IOException, TajoException { + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testEvalExpr"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] { }, workDir); + Expr expr = analyzer.parse(QUERIES[12]); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + Tuple tuple; + exec.init(); + tuple = exec.next(); + exec.close(); + assertEquals(true, tuple.getBool(0)); + assertTrue(7.0d == tuple.getFloat8(1)); + + expr = analyzer.parse(QUERIES[13]); + plan = planner.createPlan(defaultContext, expr); + rootNode = optimizer.optimize(plan); + + phyPlanner = new PhysicalPlannerImpl(conf); + exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + tuple = exec.next(); + exec.close(); + assertEquals(DatumFactory.createBool(true), tuple.asDatum(0)); + } + + public final String [] createIndexStmt = { + "create index idx_employee on employee using TWO_LEVEL_BIN_TREE (name null first, empId desc)" + }; + + @Test + public final void testCreateIndex() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex"); + Path indexPath = StorageUtil.concatPath(TajoConf.getWarehouseDir(conf), "default/idx_employee"); + if (sm.getFileSystem().exists(indexPath)) { + sm.getFileSystem().delete(indexPath, true); + } + + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] {frags[0]}, workDir); + ctx.setEnforcer(new Enforcer()); + Expr context = analyzer.parse(createIndexStmt[0]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + while (exec.next() != null) { + } + exec.close(); + + FileStatus[] list = sm.getFileSystem().listStatus(indexPath); + assertEquals(2, list.length); + } + + final static String [] duplicateElimination = { + "select distinct deptname from score", + }; + + @Test + public final void testDuplicateEliminate() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), + new Path(score.getUri()), Integer.MAX_VALUE); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testDuplicateEliminate"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] {frags[0]}, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(duplicateElimination[0]); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + Tuple tuple; + + int cnt = 0; + Set<String> expected = Sets.newHashSet( + "name_1", "name_2", "name_3", "name_4", "name_5"); + exec.init(); + while ((tuple = exec.next()) != null) { + assertTrue(expected.contains(tuple.getText(0))); + cnt++; + } + exec.close(); + assertEquals(5, cnt); + } + + public String [] SORT_QUERY = { + "select name, empId from employee order by empId" + }; + + @Test + public final void testSortEnforcer() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortEnforcer"); + Expr context = analyzer.parse(SORT_QUERY[0]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT); + + Enforcer enforcer = new Enforcer(); + enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] {frags[0]}, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + assertTrue(exec instanceof MemSortExec); + + context = analyzer.parse(SORT_QUERY[0]); + plan = planner.createPlan(defaultContext, context); + optimizer.optimize(plan); + rootNode = plan.getRootBlock().getRoot(); + + sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT); + + enforcer = new Enforcer(); + enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT); + ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] {frags[0]}, workDir); + ctx.setEnforcer(enforcer); + + phyPlanner = new PhysicalPlannerImpl(conf); + exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + assertTrue(exec instanceof ExternalSortExec); + } + + @Test + public final void testGroupByEnforcer() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()), + Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByEnforcer"); + Expr context = analyzer.parse(QUERIES[7]); + LogicalPlan plan = planner.createPlan(defaultContext, context); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + GroupbyNode groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY); + + Enforcer enforcer = new Enforcer(); + enforcer.enforceHashAggregation(groupByNode.getPID()); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] {frags[0]}, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + assertNotNull(PhysicalPlanUtil.findExecutor(exec, HashAggregateExec.class)); + + context = analyzer.parse(QUERIES[7]); + plan = planner.createPlan(defaultContext, context); + optimizer.optimize(plan); + rootNode = plan.getRootBlock().getRoot(); + + groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY); + + enforcer = new Enforcer(); + enforcer.enforceSortAggregation(groupByNode.getPID(), null); + ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(masterPlan), + new FileFragment[] {frags[0]}, workDir); + ctx.setEnforcer(enforcer); + + phyPlanner = new PhysicalPlannerImpl(conf); + exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + exec.next(); + exec.close(); + + assertTrue(exec instanceof SortAggregateExec); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java new file mode 100644 index 0000000..d79d292 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static junit.framework.Assert.assertNotNull; +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestProgressExternalSortExec { + private TajoConf conf; + private TajoTestingCluster util; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestProgressExternalSortExec"; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + + private final int numTuple = 100000; + private Random rnd = new Random(System.currentTimeMillis()); + + private TableDesc employee; + + private TableStats testDataStats; + @Before + public void setUp() throws Exception { + this.conf = new TajoConf(); + util = new TajoTestingCluster(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); + + Schema schema = new Schema(); + schema.addColumn("managerid", TajoDataTypes.Type.INT4); + schema.addColumn("empid", TajoDataTypes.Type.INT4); + schema.addColumn("deptname", TajoDataTypes.Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, schema, employeePath); + appender.enableStats(); + appender.init(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < numTuple; i++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(rnd.nextInt(50)), + DatumFactory.createInt4(rnd.nextInt(100)), + DatumFactory.createText("dept_" + i), + }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + testDataStats = appender.getStats(); + employee = new TableDesc( + CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta, + employeePath.toUri()); + catalog.createTable(employee); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + @After + public void tearDown() throws Exception { + CommonTestingUtil.cleanupTestDir(TEST_PATH); + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select managerId, empId from employee order by managerId, empId" + }; + + @Test + public void testExternalSortExecProgressWithMemTableScanner() throws Exception { + testProgress(testDataStats.getNumBytes().intValue() * 20); //multiply 20 for memory fit + } + + @Test + public void testExternalSortExecProgressWithPairWiseMerger() throws Exception { + testProgress(testDataStats.getNumBytes().intValue()); + } + + private void testProgress(int sortBufferBytesNum) throws Exception { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + ProjectionExec proj = (ProjectionExec) exec; + + // TODO - should be planed with user's optimization hint + if (!(proj.getChild() instanceof ExternalSortExec)) { + UnaryPhysicalExec sortExec = proj.getChild(); + SeqScanExec scan = sortExec.getChild(); + + ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan); + + extSort.setSortBufferBytesNum(sortBufferBytesNum); + proj.setChild(extSort); + } else { + ((ExternalSortExec)proj.getChild()).setSortBufferBytesNum(sortBufferBytesNum); + } + + Tuple tuple; + Tuple preVal = null; + Tuple curVal; + int cnt = 0; + exec.init(); + BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), + new SortSpec[]{ + new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)), + new SortSpec(new Column("empid", TajoDataTypes.Type.INT4)) + }); + + float initProgress = 0.0f; + while ((tuple = exec.next()) != null) { + if (cnt == 0) { + initProgress = exec.getProgress(); + assertTrue(initProgress > 0.5f && initProgress < 1.0f); + } + + if (cnt == testDataStats.getNumRows() / 2) { + float progress = exec.getProgress(); + + assertTrue(progress > initProgress); + } + curVal = tuple; + if (preVal != null) { + assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = curVal; + cnt++; + } + + assertEquals(1.0f, exec.getProgress(), 0); + assertEquals(numTuple, cnt); + + TableStats tableStats = exec.getInputStats(); + assertNotNull(tableStats); + assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue()); + assertEquals(cnt, testDataStats.getNumRows().longValue()); + assertEquals(cnt, tableStats.getNumRows().longValue()); + assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue()); + + // for rescan test + preVal = null; + exec.rescan(); + + cnt = 0; + while ((tuple = exec.next()) != null) { + curVal = tuple; + if (preVal != null) { + assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = curVal; + cnt++; + } + assertEquals(1.0f, exec.getProgress(), 0); + assertEquals(numTuple, cnt); + exec.close(); + assertEquals(1.0f, exec.getProgress(), 0); + + tableStats = exec.getInputStats(); + assertNotNull(tableStats); + assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue()); + assertEquals(cnt, testDataStats.getNumRows().longValue()); + assertEquals(cnt, tableStats.getNumRows().longValue()); + assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java new file mode 100644 index 0000000..fe36602 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; + +// this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order +public class TestRightOuterHashJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuterHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private QueryContext defaultContext; + + private TableDesc dep3; + private TableDesc job3; + private TableDesc emp3; + + private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3"); + private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3"); + private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3"); + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + //----------------- dep3 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + Schema dep3Schema = new Schema(); + dep3Schema.addColumn("dep_id", Type.INT4); + dep3Schema.addColumn("dep_name", Type.TEXT); + dep3Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT"); + Path dep3Path = new Path(testDir, "dep3.csv"); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(dep3Meta, dep3Schema, dep3Path); + appender1.init(); + VTuple tuple = new VTuple(dep3Schema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender1.addTuple(tuple); + } + + appender1.flush(); + appender1.close(); + dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path); + catalog.createTable(dep3); + + //----------------- job3 ------------------------------ + // job_id | job_title + // ---------------------- + // 101 | job_101 + // 102 | job_102 + // 103 | job_103 + + Schema job3Schema = new Schema(); + job3Schema.addColumn("job_id", Type.INT4); + job3Schema.addColumn("job_title", Type.TEXT); + + + TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT"); + Path job3Path = new Path(testDir, "job3.csv"); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(job3Meta, job3Schema, job3Path); + appender2.init(); + VTuple tuple2 = new VTuple(job3Schema.size()); + for (int i = 1; i < 4; i++) { + int x = 100 + i; + tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), + DatumFactory.createText("job_" + x) }); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path); + catalog.createTable(job3); + + + + //---------------------emp3 -------------------- + // emp_id | first_name | last_name | dep_id | salary | job_id + // ------------------------------------------------------------ + // 11 | fn_11 | ln_11 | 1 | 123 | 101 + // 13 | fn_13 | ln_13 | 3 | 369 | 103 + // 15 | fn_15 | ln_15 | 5 | 615 | null + // 17 | fn_17 | ln_17 | 7 | 861 | null + // 19 | fn_19 | ln_19 | 9 | 1107 | null + // 21 | fn_21 | ln_21 | 1 | 123 | 101 + // 23 | fn_23 | ln_23 | 3 | 369 | 103 + + Schema emp3Schema = new Schema(); + emp3Schema.addColumn("emp_id", Type.INT4); + emp3Schema.addColumn("first_name", Type.TEXT); + emp3Schema.addColumn("last_name", Type.TEXT); + emp3Schema.addColumn("dep_id", Type.INT4); + emp3Schema.addColumn("salary", Type.FLOAT4); + emp3Schema.addColumn("job_id", Type.INT4); + + + TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT"); + Path emp3Path = new Path(testDir, "emp3.csv"); + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(emp3Meta, emp3Schema, emp3Path); + appender3.init(); + VTuple tuple3 = new VTuple(emp3Schema.size()); + + for (int i = 1; i < 4; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + + int y = 20 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i), + DatumFactory.createText("firstname_" + y), + DatumFactory.createText("lastname_" + y), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + } + + for (int i = 5; i < 10; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createNullDatum() }); + appender3.addTuple(tuple3); + } + + appender3.flush(); + appender3.close(); + emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path); + catalog.createTable(emp3); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id", //0 no nulls + "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id", //1 nulls on the left operand + "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id" //2 nulls on the right side + }; + + @Test + public final void testRightOuter_HashJoinExec0() throws IOException, TajoException { + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()), + Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuter_HashJoinExec0"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + if (proj.getChild() instanceof RightOuterMergeJoinExec) { + //for this small data set this is not likely to happen + + assertEquals(1, 0); + } + else{ + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + + while ((tuple = exec.next()) != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(12, count); + } + } + + + @Test + public final void testRightOuter_HashJoinExec1() throws IOException, TajoException { + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), + Integer.MAX_VALUE); + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuter_HashJoinExec1"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + if (proj.getChild() instanceof RightOuterMergeJoinExec) { + //for this small data set this is not likely to happen + + assertEquals(1, 0); + } + else{ + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + + while ((tuple = exec.next()) != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(5, count); + } + } + + @Test + public final void testRightOuter_HashJoinExec2() throws IOException, TajoException { + + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), + Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuter_HashJoinExec2"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[2]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + if (proj.getChild() instanceof RightOuterMergeJoinExec) { + //for this small data set this is not likely to happen + + assertEquals(1, 0); + } + else{ + Tuple tuple; + int count = 0; + int i = 1; + exec.init(); + + while ((tuple = exec.next()) != null) { + //TODO check contents + count = count + 1; + } + exec.close(); + assertEquals(7, count); + } + } +}
