http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java index a5d1dc5..e976456 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java @@ -76,7 +76,7 @@ public class TestNLJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema schema = new Schema(); schema.addColumn("managerid", Type.INT4); @@ -86,7 +86,8 @@ public class TestNLJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, schema, employeePath); appender.init(); Tuple tuple = new VTuple(schema.size()); for (int i = 0; i < 50; i++) { @@ -109,7 +110,8 @@ public class TestNLJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path peoplePath = new Path(testDir, "people.csv"); - appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < 50; i += 2) { @@ -144,9 +146,9 @@ public class TestNLJoinExec { @Test public final void testNLCrossJoin() throws IOException, PlanningException { - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -159,7 +161,7 @@ public class TestNLJoinExec { LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); int i = 0; @@ -173,9 +175,9 @@ public class TestNLJoinExec { @Test public final void testNLInnerJoin() throws IOException, PlanningException { - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -188,7 +190,7 @@ public class TestNLJoinExec { LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); Tuple tuple;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index d507b97..cce4ba7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -51,10 +51,12 @@ import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; @@ -78,7 +80,6 @@ import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; import static org.junit.Assert.*; public class TestPhysicalPlanner { @@ -88,7 +89,7 @@ public class TestPhysicalPlanner { private static SQLAnalyzer analyzer; private static LogicalPlanner planner; private static LogicalOptimizer optimizer; - private static StorageManager sm; + private static FileStorageManager sm; private static Path testDir; private static Session session = LocalTajoTestingUtility.createDummySession(); private static QueryContext defaultContext; @@ -106,7 +107,7 @@ public class TestPhysicalPlanner { util.startCatalogCluster(); conf = util.getConfiguration(); testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner"); - sm = StorageManager.getStorageManager(conf, testDir); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir); catalog = util.getMiniCatalogCluster().getCatalog(); catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); @@ -129,8 +130,7 @@ public class TestPhysicalPlanner { Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema, - employeePath); + Appender appender = sm.getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); Tuple tuple = new VTuple(employeeSchema.size()); for (int i = 0; i < 100; i++) { @@ -148,7 +148,7 @@ public class TestPhysicalPlanner { Path scorePath = new Path(testDir, "score"); TableMeta scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV, new KeyValueSet()); - appender = StorageManager.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, scorePath); + appender = sm.getAppender(scoreMeta, scoreSchema, scorePath); appender.init(); score = new TableDesc( CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta, @@ -189,8 +189,8 @@ public class TestPhysicalPlanner { Schema scoreSchmea = score.getSchema(); TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); - Appender appender = StorageManager.getStorageManager(conf).getAppender(scoreLargeMeta, scoreSchmea, - scoreLargePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath); appender.enableStats(); appender.init(); largeScore = new TableDesc( @@ -246,7 +246,7 @@ public class TestPhysicalPlanner { @Test public final void testCreateScanPlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -259,7 +259,7 @@ public class TestPhysicalPlanner { optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple; @@ -277,7 +277,7 @@ public class TestPhysicalPlanner { @Test public final void testCreateScanWithFilterPlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -290,7 +290,7 @@ public class TestPhysicalPlanner { optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple; @@ -306,7 +306,7 @@ public class TestPhysicalPlanner { @Test public final void testGroupByPlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -318,7 +318,7 @@ public class TestPhysicalPlanner { optimizer.optimize(plan); LogicalNode rootNode = plan.getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); int i = 0; @@ -337,7 +337,7 @@ public class TestPhysicalPlanner { @Test public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException { // TODO - currently, this query does not use hash-based group operator. - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir( "target/test-data/testHashGroupByPlanWithALLField"); @@ -349,7 +349,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, expr); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); int i = 0; @@ -367,7 +367,7 @@ public class TestPhysicalPlanner { @Test public final void testSortGroupByPlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -378,7 +378,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, context); optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot()); /*HashAggregateExec hashAgg = (HashAggregateExec) exec; @@ -430,7 +430,7 @@ public class TestPhysicalPlanner { @Test public final void testStorePlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -443,17 +443,16 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, context); LogicalNode rootNode = optimizer.optimize(plan); - TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); exec.close(); - Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(), - ctx.getOutputPath()); + Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); scanner.init(); Tuple tuple; int i = 0; @@ -477,7 +476,7 @@ public class TestPhysicalPlanner { TableStats stats = largeScore.getStats(); assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB); - FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), new Path(largeScore.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithMaxOutputFileSize"); @@ -497,7 +496,7 @@ public class TestPhysicalPlanner { LogicalNode rootNode = optimizer.optimize(plan); // executing StoreTableExec - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -512,7 +511,7 @@ public class TestPhysicalPlanner { // checking the file contents long totalNum = 0; for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) { - Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner( + Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner( CatalogUtil.newTableMeta(StoreType.CSV), rootNode.getOutSchema(), status.getPath()); @@ -528,7 +527,7 @@ public class TestPhysicalPlanner { @Test public final void testStorePlanWithRCFile() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -543,14 +542,14 @@ public class TestPhysicalPlanner { TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); exec.close(); - Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(), - ctx.getOutputPath()); + Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner( + outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); scanner.init(); Tuple tuple; int i = 0; @@ -569,7 +568,7 @@ public class TestPhysicalPlanner { @Test public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -581,7 +580,7 @@ public class TestPhysicalPlanner { Expr context = analyzer.parse(CreateTableAsStmts[2]); LogicalPlan plan = planner.createPlan(defaultContext, context); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); assertTrue(exec instanceof SortBasedColPartitionStoreExec); } @@ -596,7 +595,7 @@ public class TestPhysicalPlanner { Enforcer enforcer = new Enforcer(); enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION); - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -605,7 +604,7 @@ public class TestPhysicalPlanner { ctx.setEnforcer(enforcer); ctx.setOutputPath(new Path(workDir, "grouped4")); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); assertTrue(exec instanceof HashBasedColPartitionStoreExec); } @@ -620,7 +619,7 @@ public class TestPhysicalPlanner { Enforcer enforcer = new Enforcer(); enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION); - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -629,14 +628,14 @@ public class TestPhysicalPlanner { ctx.setEnforcer(enforcer); ctx.setOutputPath(new Path(workDir, "grouped5")); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); assertTrue(exec instanceof SortBasedColPartitionStoreExec); } @Test public final void testPartitionedStorePlan() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] }, @@ -660,7 +659,7 @@ public class TestPhysicalPlanner { QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId(); ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -671,7 +670,7 @@ public class TestPhysicalPlanner { Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); FileStatus [] list = fs.listStatus(queryLocalTmpDir); - List<FileFragment> fragments = new ArrayList<FileFragment>(); + List<Fragment> fragments = new ArrayList<Fragment>(); for (FileStatus status : list) { assertTrue(status.isDirectory()); FileStatus [] files = fs.listStatus(status.getPath()); @@ -705,7 +704,7 @@ public class TestPhysicalPlanner { public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException { // Preparing working dir and input fragments - FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(), new Path(largeScore.getPath()), Integer.MAX_VALUE); QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlanWithMaxFileSize"); @@ -724,7 +723,7 @@ public class TestPhysicalPlanner { LogicalNode rootNode = optimizer.optimize(plan); // Executing CREATE TABLE PARTITION BY - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -735,7 +734,7 @@ public class TestPhysicalPlanner { // checking the number of partitions assertEquals(2, list.length); - List<FileFragment> fragments = Lists.newArrayList(); + List<Fragment> fragments = Lists.newArrayList(); int i = 0; for (FileStatus status : list) { assertTrue(status.isDirectory()); @@ -769,7 +768,7 @@ public class TestPhysicalPlanner { @Test public final void testPartitionedStorePlanWithEmptyGroupingSet() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); @@ -794,7 +793,7 @@ public class TestPhysicalPlanner { QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId(); ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -805,7 +804,7 @@ public class TestPhysicalPlanner { Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); FileStatus [] list = fs.listStatus(queryLocalTmpDir); - List<FileFragment> fragments = new ArrayList<FileFragment>(); + List<Fragment> fragments = new ArrayList<Fragment>(); for (FileStatus status : list) { assertTrue(status.isDirectory()); FileStatus [] files = fs.listStatus(status.getPath()); @@ -836,7 +835,7 @@ public class TestPhysicalPlanner { @Test public final void testAggregationFunction() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -853,7 +852,7 @@ public class TestPhysicalPlanner { function.setFirstPhase(); } - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); @@ -867,7 +866,7 @@ public class TestPhysicalPlanner { @Test public final void testCountFunction() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -884,7 +883,7 @@ public class TestPhysicalPlanner { function.setFirstPhase(); } - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); Tuple tuple = exec.next(); @@ -895,7 +894,7 @@ public class TestPhysicalPlanner { @Test public final void testGroupByWithNullValue() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -906,7 +905,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, context); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); int count = 0; @@ -920,7 +919,7 @@ public class TestPhysicalPlanner { @Test public final void testUnionPlan() throws IOException, PlanningException, CloneNotSupportedException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -936,7 +935,7 @@ public class TestPhysicalPlanner { union.setRightChild((LogicalNode) root.getChild().clone()); root.setChild(union); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, root); int count = 0; @@ -958,7 +957,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, expr); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple; exec.init(); @@ -971,7 +970,7 @@ public class TestPhysicalPlanner { plan = planner.createPlan(defaultContext, expr); rootNode = optimizer.optimize(plan); - phyPlanner = new PhysicalPlannerImpl(conf, sm); + phyPlanner = new PhysicalPlannerImpl(conf); exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); tuple = exec.next(); @@ -985,7 +984,7 @@ public class TestPhysicalPlanner { //@Test public final void testCreateIndex() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -995,7 +994,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, context); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); while (exec.next() != null) { @@ -1012,7 +1011,7 @@ public class TestPhysicalPlanner { @Test public final void testDuplicateEliminate() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate"); @@ -1024,7 +1023,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, expr); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple; @@ -1046,7 +1045,7 @@ public class TestPhysicalPlanner { @Test public final void testIndexedStoreExec() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec"); @@ -1064,7 +1063,7 @@ public class TestPhysicalPlanner { channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray()); ctx.setDataChannel(channel); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple; @@ -1084,7 +1083,7 @@ public class TestPhysicalPlanner { Path outputPath = StorageUtil.concatPath(workDir, "output", "output"); TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet()); SeekableScanner scanner = - StorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath); + FileStorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath); scanner.init(); int cnt = 0; @@ -1139,7 +1138,7 @@ public class TestPhysicalPlanner { @Test public final void testSortEnforcer() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer"); @@ -1157,7 +1156,7 @@ public class TestPhysicalPlanner { new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -1179,7 +1178,7 @@ public class TestPhysicalPlanner { new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); - phyPlanner = new PhysicalPlannerImpl(conf,sm); + phyPlanner = new PhysicalPlannerImpl(conf); exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -1190,7 +1189,7 @@ public class TestPhysicalPlanner { @Test public final void testGroupByEnforcer() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer"); Expr context = analyzer.parse(QUERIES[7]); @@ -1207,7 +1206,7 @@ public class TestPhysicalPlanner { new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); @@ -1229,7 +1228,7 @@ public class TestPhysicalPlanner { new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); - phyPlanner = new PhysicalPlannerImpl(conf,sm); + phyPlanner = new PhysicalPlannerImpl(conf); exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index a23a2d1..3c78b12 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -80,7 +80,7 @@ public class TestProgressExternalSortExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema schema = new Schema(); schema.addColumn("managerid", TajoDataTypes.Type.INT4); @@ -89,7 +89,8 @@ public class TestProgressExternalSortExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, schema, employeePath); appender.enableStats(); appender.init(); Tuple tuple = new VTuple(schema.size()); @@ -136,7 +137,7 @@ public class TestProgressExternalSortExec { } private void testProgress(int sortBufferBytesNum) throws Exception { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -146,7 +147,7 @@ public class TestProgressExternalSortExec { LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); LogicalNode rootNode = plan.getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); ProjectionExec proj = (ProjectionExec) exec; @@ -156,8 +157,7 @@ public class TestProgressExternalSortExec { UnaryPhysicalExec sortExec = proj.getChild(); SeqScanExec scan = sortExec.getChild(); - ExternalSortExec extSort = new ExternalSortExec(ctx, sm, - ((MemSortExec)sortExec).getPlan(), scan); + ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan); extSort.setSortBufferBytesNum(sortBufferBytesNum); proj.setChild(extSort); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java index 9ebe871..879ca21 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java @@ -36,10 +36,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; @@ -83,7 +80,7 @@ public class TestRightOuterHashJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); //----------------- dep3 ------------------------------ // dep_id | dep_name | loc_id @@ -106,7 +103,8 @@ public class TestRightOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); Tuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -135,7 +133,8 @@ public class TestRightOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); Tuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -174,7 +173,8 @@ public class TestRightOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); Tuple tuple3 = new VTuple(emp3Schema.size()); @@ -232,9 +232,9 @@ public class TestRightOuterHashJoinExec { @Test public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), + FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); @@ -246,7 +246,7 @@ public class TestRightOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[0]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -273,9 +273,9 @@ public class TestRightOuterHashJoinExec { @Test public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException { - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); @@ -287,7 +287,7 @@ public class TestRightOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[1]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -314,9 +314,9 @@ public class TestRightOuterHashJoinExec { @Test public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); @@ -328,7 +328,7 @@ public class TestRightOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[2]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java index 4956b7f..8bc00cc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java @@ -87,7 +87,7 @@ public class TestRightOuterMergeJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); //----------------- dep3 ------------------------------ // dep_id | dep_name | loc_id @@ -110,7 +110,8 @@ public class TestRightOuterMergeJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); Tuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -148,7 +149,8 @@ public class TestRightOuterMergeJoinExec { TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep4Path = new Path(testDir, "dep4.csv"); - Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path); + Appender appender4 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep4Meta, dep4Schema, dep4Path); appender4.init(); Tuple tuple4 = new VTuple(dep4Schema.size()); for (int i = 0; i < 11; i++) { @@ -179,7 +181,8 @@ public class TestRightOuterMergeJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); Tuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -218,7 +221,8 @@ public class TestRightOuterMergeJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); Tuple tuple3 = new VTuple(emp3Schema.size()); @@ -270,8 +274,8 @@ public class TestRightOuterMergeJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, - phone3Path); + Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); appender5.flush(); @@ -313,9 +317,9 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), + FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); @@ -324,7 +328,7 @@ public class TestRightOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -351,9 +355,9 @@ public class TestRightOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] job3Frags = - StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1"); @@ -361,7 +365,7 @@ public class TestRightOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -387,9 +391,9 @@ public class TestRightOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] job3Frags = - StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2"); @@ -397,7 +401,7 @@ public class TestRightOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); @@ -423,9 +427,9 @@ public class TestRightOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] dep4Frags = - StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3"); @@ -433,7 +437,7 @@ public class TestRightOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -460,9 +464,9 @@ public class TestRightOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] phone3Frags = - StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); @@ -471,7 +475,7 @@ public class TestRightOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); @@ -497,8 +501,8 @@ public class TestRightOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags); @@ -508,7 +512,7 @@ public class TestRightOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index afa7430..8a61cab 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -57,7 +57,7 @@ public class TestSortExec { private static SQLAnalyzer analyzer; private static LogicalPlanner planner; private static LogicalOptimizer optimizer; - private static StorageManager sm; + private static FileStorageManager sm; private static Path workDir; private static Path tablePath; private static TableMeta employeeMeta; @@ -70,7 +70,7 @@ public class TestSortExec { util = TpchTestBase.getInstance().getTestingCluster(); catalog = util.getMaster().getCatalog(); workDir = CommonTestingUtil.getTestDir(TEST_PATH); - sm = StorageManager.getStorageManager(conf, workDir); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir); Schema schema = new Schema(); schema.addColumn("managerid", Type.INT4); @@ -82,7 +82,8 @@ public class TestSortExec { tablePath = StorageUtil.concatPath(workDir, "employee", "table1"); sm.getFileSystem().mkdirs(tablePath.getParent()); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, tablePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, schema, tablePath); appender.init(); Tuple tuple = new VTuple(schema.size()); for (int i = 0; i < 100; i++) { @@ -110,7 +111,7 @@ public class TestSortExec { @Test public final void testNext() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE); + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), LocalTajoTestingUtility @@ -120,7 +121,7 @@ public class TestSortExec { LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), context); LogicalNode rootNode = optimizer.optimize(plan); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); Tuple tuple;
