http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java index ae90502..afa273b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java @@ -84,7 +84,7 @@ public class TestHashSemiJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); VTuple tuple = new VTuple(employeeSchema.size()); @@ -110,7 +110,7 @@ public class TestHashSemiJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); @@ -133,7 +133,7 @@ public class TestHashSemiJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java index bbb441c..c93a1b4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java @@ -105,7 +105,7 @@ public class TestLeftOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV"); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); VTuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -134,7 +134,7 @@ public class TestLeftOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta("CSV"); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); VTuple tuple2 = new VTuple(job3Schema.size()); @@ -174,7 +174,7 @@ public class TestLeftOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV"); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); VTuple tuple3 = new VTuple(emp3Schema.size()); @@ -227,7 +227,7 @@ public class TestLeftOuterHashJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV"); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); @@ -239,7 +239,7 @@ public class TestLeftOuterHashJoinExec { analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java index d0d0983..c4e7752 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java @@ -85,7 +85,7 @@ public class TestMergeJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); VTuple tuple = new VTuple(employeeSchema.size()); @@ -114,7 +114,7 @@ public class TestMergeJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); @@ -139,7 +139,7 @@ public class TestMergeJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); } @After http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java index 4866323..1b30ef8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java @@ -83,7 +83,7 @@ public class TestNLJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.init(); VTuple tuple = new VTuple(schema.size()); @@ -107,7 +107,7 @@ public class TestNLJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); @@ -125,7 +125,7 @@ public class TestNLJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index b2a228a..dff0cbe 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -98,7 +98,7 @@ public class TestPhysicalPlanner { util.startCatalogCluster(); conf = util.getConfiguration(); testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner"); - sm = TableSpaceManager.getLocalFs(); + sm = TablespaceManager.getLocalFs(); catalog = util.getMiniCatalogCluster().getCatalog(); catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); @@ -164,7 +164,7 @@ public class TestPhysicalPlanner { appender.close(); catalog.createTable(score); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); @@ -180,7 +180,7 @@ public class TestPhysicalPlanner { Schema scoreSchmea = score.getSchema(); TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet()); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath); appender.enableStats(); appender.init(); @@ -442,7 +442,7 @@ public class TestPhysicalPlanner { exec.next(); exec.close(); - Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs()) + Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()) .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); scanner.init(); Tuple tuple; @@ -502,7 +502,7 @@ public class TestPhysicalPlanner { // checking the file contents long totalNum = 0; for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) { - Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs()).getFileScanner( + Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner( CatalogUtil.newTableMeta("CSV"), rootNode.getOutSchema(), status.getPath()); @@ -539,7 +539,7 @@ public class TestPhysicalPlanner { exec.next(); exec.close(); - Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs()).getFileScanner( + Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner( outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); scanner.init(); Tuple tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index 1b54948..d1da787 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -86,7 +86,7 @@ public class TestProgressExternalSortExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.enableStats(); appender.init(); @@ -110,7 +110,7 @@ public class TestProgressExternalSortExec { employeePath.toUri()); catalog.createTable(employee); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); } @After http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java index c956f29..f581db8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java @@ -100,7 +100,7 @@ public class TestRightOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV"); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); VTuple tuple = new VTuple(dep3Schema.size()); @@ -130,7 +130,7 @@ public class TestRightOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta("CSV"); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); VTuple tuple2 = new VTuple(job3Schema.size()); @@ -170,7 +170,7 @@ public class TestRightOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV"); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); VTuple tuple3 = new VTuple(emp3Schema.size()); @@ -212,7 +212,7 @@ public class TestRightOuterHashJoinExec { catalog.createTable(emp3); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java index 25f0ca4..d86b229 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java @@ -107,7 +107,7 @@ public class TestRightOuterMergeJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV"); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); VTuple tuple = new VTuple(dep3Schema.size()); @@ -146,7 +146,7 @@ public class TestRightOuterMergeJoinExec { TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV"); Path dep4Path = new Path(testDir, "dep4.csv"); - Appender appender4 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(dep4Meta, dep4Schema, dep4Path); appender4.init(); VTuple tuple4 = new VTuple(dep4Schema.size()); @@ -178,7 +178,7 @@ public class TestRightOuterMergeJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta("CSV"); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); VTuple tuple2 = new VTuple(job3Schema.size()); @@ -218,7 +218,7 @@ public class TestRightOuterMergeJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV"); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); VTuple tuple3 = new VTuple(emp3Schema.size()); @@ -271,7 +271,7 @@ public class TestRightOuterMergeJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV"); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); @@ -281,7 +281,7 @@ public class TestRightOuterMergeJoinExec { catalog.createTable(phone3); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index ce12faf..4690e71 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -69,7 +69,7 @@ public class TestSortExec { util = TpchTestBase.getInstance().getTestingCluster(); catalog = util.getMaster().getCatalog(); workDir = CommonTestingUtil.getTestDir(TEST_PATH); - sm = TableSpaceManager.getLocalFs(); + sm = TablespaceManager.getLocalFs(); Schema schema = new Schema(); schema.addColumn("managerid", Type.INT4); @@ -81,7 +81,7 @@ public class TestSortExec { tablePath = StorageUtil.concatPath(workDir, "employee", "table1"); sm.getFileSystem().mkdirs(tablePath.getParent()); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, schema, tablePath); appender.init(); VTuple tuple = new VTuple(schema.size()); @@ -101,7 +101,7 @@ public class TestSortExec { catalog.createTable(desc); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 3d2d857..569111c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -38,7 +38,7 @@ import org.apache.tajo.datum.TextDatum; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.hbase.*; @@ -82,7 +82,7 @@ public class TestHBaseTable extends QueryTestCaseBase { tableSpaceUri = "hbase:zk://" + hostName + ":" + zkPort; HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri)); hBaseTablespace.init(new TajoConf(testingCluster.getHBaseUtil().getConf())); - TableSpaceManager.addTableSpaceForTest(hBaseTablespace); + TablespaceManager.addTableSpaceForTest(hBaseTablespace); } @AfterClass @@ -213,7 +213,7 @@ public class TestHBaseTable extends QueryTestCaseBase { assertTableExists("external_hbase_mapped_table"); - HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get(); + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); HConnection hconn = space.getConnection(); HTableInterface htable = hconn.getTable("external_hbase_table"); @@ -253,7 +253,7 @@ public class TestHBaseTable extends QueryTestCaseBase { assertTableExists("external_hbase_mapped_table"); - HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get(); + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); HConnection hconn = space.getConnection(); HTableInterface htable = hconn.getTable("external_hbase_table"); @@ -306,7 +306,7 @@ public class TestHBaseTable extends QueryTestCaseBase { assertTableExists("external_hbase_mapped_table"); - HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get(); + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); HConnection hconn = space.getConnection(); HTableInterface htable = hconn.getTable("external_hbase_table"); @@ -343,7 +343,7 @@ public class TestHBaseTable extends QueryTestCaseBase { assertTableExists("external_hbase_mapped_table"); - HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get(); + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); HConnection hconn = space.getConnection(); HTableInterface htable = hconn.getTable("external_hbase_table"); @@ -477,7 +477,7 @@ public class TestHBaseTable extends QueryTestCaseBase { EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), new ConstEval(new TextDatum("021"))); scanNode.setQual(evalNodeEq); - Tablespace tablespace = TableSpaceManager.getByName("cluster1").get(); + Tablespace tablespace = TablespaceManager.getByName("cluster1").get(); List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode); assertEquals(1, fragments.size()); assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow())); @@ -683,6 +683,48 @@ public class TestHBaseTable extends QueryTestCaseBase { } @Test + public void testInsertValues1() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table select 'aaa', 'a12', 'a34', 1").close(); + executeString("insert into hbase_mapped_table select 'bbb', 'b12', 'b34', 2").close(); + executeString("insert into hbase_mapped_table select 'ccc', 'c12', 'c34', 3").close(); + executeString("insert into hbase_mapped_table select 'ddd', 'd12', 'd34', 4").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test public void testInsertIntoMultiRegion() throws Exception { executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + @@ -1301,10 +1343,8 @@ public class TestHBaseTable extends QueryTestCaseBase { } } - private String resultSetToString(ResultScanner scanner, - byte[][] cfNames, byte[][] qualifiers, - boolean[] binaries, - Schema schema) throws Exception { + private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers, + boolean [] binaries, Schema schema) throws Exception { StringBuilder sb = new StringBuilder(); Result result = null; while ( (result = scanner.next()) != null ) { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java index 1478690..dd67e06 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java @@ -264,7 +264,7 @@ public class TestJoinQuery extends QueryTestCaseBase { } Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv"); fileIndex++; - appender = (((FileTablespace)TableSpaceManager.getLocalFs())) + appender = (((FileTablespace) TablespaceManager.getLocalFs())) .getAppender(tableMeta, schema, dataPath); appender.init(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index c714749..265f075 100644 --- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -20,10 +20,13 @@ package org.apache.tajo.ha; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.TpchTestBase; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.service.ServiceTracker; @@ -32,58 +35,51 @@ import org.junit.Test; import static junit.framework.Assert.assertTrue; import static junit.framework.TestCase.assertEquals; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotEquals; public class TestHAServiceHDFSImpl { private TajoTestingCluster cluster; - private TajoMaster backupMaster; - private TajoConf conf; - private TajoClient client; + private TajoMaster primaryMaster; + private TajoMaster backupMaster; private Path haPath, activePath, backupPath; - private String masterAddress; - @Test public final void testAutoFailOver() throws Exception { - cluster = new TajoTestingCluster(true); - - cluster.startMiniCluster(1); - conf = cluster.getConfiguration(); - client = cluster.newTajoClient(); + cluster = TpchTestBase.getInstance().getTestingCluster(); try { FileSystem fs = cluster.getDefaultFileSystem(); - ServiceTracker serviceTracker = ServiceTrackerFactory.get(conf); - masterAddress = serviceTracker.getUmbilicalAddress().getHostName(); - - setConfiguration(); + TajoConf primaryConf = setConfigForHAMaster(); + primaryMaster = new TajoMaster(); + primaryMaster.init(primaryConf); + primaryMaster.start(); + TajoConf backupConf = setConfigForHAMaster(); backupMaster = new TajoMaster(); - backupMaster.init(conf); + backupMaster.init(backupConf); backupMaster.start(); - assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName()); + ServiceTracker tracker = ServiceTrackerFactory.get(primaryConf); + assertNotEquals(primaryMaster.getMasterName(), backupMaster.getMasterName()); verifySystemDirectories(fs); assertEquals(2, fs.listStatus(activePath).length); assertEquals(1, fs.listStatus(backupPath).length); assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE))); - assertTrue(fs.exists(new Path(activePath, cluster.getMaster().getMasterName().replaceAll(":", "_")))); + assertTrue(fs.exists(new Path(activePath, primaryMaster.getMasterName().replaceAll(":", "_")))); assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_")))); - createDatabaseAndTable(); - verifyDataBaseAndTable(); - client.close(); + createDatabaseAndTable(tracker); + verifyDataBaseAndTable(tracker); - cluster.getMaster().stop(); + primaryMaster.stop(); - client = cluster.newTajoClient(); - verifyDataBaseAndTable(); + verifyDataBaseAndTable(tracker); assertEquals(2, fs.listStatus(activePath).length); assertEquals(0, fs.listStatus(backupPath).length); @@ -91,25 +87,23 @@ public class TestHAServiceHDFSImpl { assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE))); assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_")))); } finally { - client.close(); backupMaster.stop(); - cluster.shutdownMiniCluster(); } } - private void setConfiguration() { - conf = cluster.getConfiguration(); + private TajoConf setConfigForHAMaster() { + TajoConf conf = new TajoConf(cluster.getConfiguration()); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); + "localhost:" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); + "localhost:" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); + "localhost:" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); + "localhost:" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); + "localhost:" + NetUtils.getFreeSocketPort()); conf.setIntVar(TajoConf.ConfVars.REST_SERVICE_PORT, NetUtils.getFreeSocketPort()); @@ -126,6 +120,8 @@ public class TestHAServiceHDFSImpl { conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2); conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); + + return conf; } private void verifySystemDirectories(FileSystem fs) throws Exception { @@ -139,14 +135,26 @@ public class TestHAServiceHDFSImpl { assertTrue(fs.exists(backupPath)); } - private void createDatabaseAndTable() throws Exception { - client.executeQuery("CREATE TABLE default.table1 (age int);"); - client.executeQuery("CREATE TABLE default.table2 (age int);"); + private void createDatabaseAndTable(ServiceTracker tracker) throws Exception { + TajoClient client = null; + try { + client = new TajoClientImpl(tracker); + client.executeQuery("CREATE TABLE default.ha_test1 (age int);"); + client.executeQuery("CREATE TABLE default.ha_test2 (age int);"); + } finally { + IOUtils.cleanup(null, client); + } } - private void verifyDataBaseAndTable() throws Exception { - client.existDatabase("default"); - client.existTable("default.table1"); - client.existTable("default.table2"); + private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception { + TajoClient client = null; + try { + client = new TajoClientImpl(tracker); + client.existDatabase("default"); + client.existTable("default.ha_test1"); + client.existTable("default.ha_test2"); + } finally { + IOUtils.cleanup(null, client); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java index fc25c27..3d32c08 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java @@ -66,7 +66,7 @@ public class TestResultSet { public static void setup() throws Exception { util = TpchTestBase.getInstance().getTestingCluster(); conf = util.getConfiguration(); - sm = TableSpaceManager.getDefault(); + sm = TablespaceManager.getDefault(); scoreSchema = new Schema(); scoreSchema.addColumn("deptname", Type.TEXT); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java index 48966bc..7c61cc7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java @@ -34,7 +34,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -79,7 +79,7 @@ public class TestExecutionBlockCursor { } analyzer = new SQLAnalyzer(); - logicalPlanner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); dispatcher = new AsyncDispatcher(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java index 6322732..e8d59d0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -22,18 +22,17 @@ import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; import java.io.File; +import java.sql.ResultSet; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.UUID; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.client.ResultSetUtil; import org.apache.tajo.client.TajoClient; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; @@ -62,256 +61,21 @@ import org.junit.Test; import com.google.protobuf.ByteString; -public class TestNonForwardQueryResultSystemScanner { - - private class CollectionMatcher<T> extends TypeSafeDiagnosingMatcher<Iterable<? extends T>> { - - private final Matcher<? extends T> matcher; - - public CollectionMatcher(Matcher<? extends T> matcher) { - this.matcher = matcher; - } - - @Override - public void describeTo(Description description) { - description.appendText("a collection containing ").appendDescriptionOf(this.matcher); - } - - @Override - protected boolean matchesSafely(Iterable<? extends T> item, Description mismatchDescription) { - boolean isFirst = true; - Iterator<? extends T> iterator = item.iterator(); - - while (iterator.hasNext()) { - T obj = iterator.next(); - if (this.matcher.matches(obj)) { - return true; - } - - if (!isFirst) { - mismatchDescription.appendText(", "); - } - - this.matcher.describeMismatch(obj, mismatchDescription); - isFirst = false; - } - return false; - } - - } - - private <T> Matcher<Iterable<? extends T>> hasItem(Matcher<? extends T> matcher) { - return new CollectionMatcher<T>(matcher); - } - - private static LocalTajoTestingUtility testUtil; - private static TajoTestingCluster testingCluster; - private static TajoConf conf; - private static MasterContext masterContext; - - private static SQLAnalyzer analyzer; - private static LogicalPlanner logicalPlanner; - private static LogicalOptimizer logicalOptimizer; - - private static void setupTestingCluster() throws Exception { - testUtil = new LocalTajoTestingUtility(); - String[] names, paths; - Schema[] schemas; - - TPCH tpch = new TPCH(); - tpch.loadSchemas(); - tpch.loadQueries(); - - names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", - "region", "supplier", "empty_orders"}; - schemas = new Schema[names.length]; - for (int i = 0; i < names.length; i++) { - schemas[i] = tpch.getSchema(names[i]); - } - - File file; - paths = new String[names.length]; - for (int i = 0; i < names.length; i++) { - file = new File("src/test/tpch/" + names[i] + ".tbl"); - if(!file.exists()) { - file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i] - + ".tbl"); - } - paths[i] = file.getAbsolutePath(); - } - - KeyValueSet opt = new KeyValueSet(); - opt.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - testUtil.setup(names, paths, schemas, opt); - - testingCluster = testUtil.getTestingCluster(); - } - - @BeforeClass - public static void setUp() throws Exception { - setupTestingCluster(); - - conf = testingCluster.getConfiguration(); - masterContext = testingCluster.getMaster().getContext(); - - GlobalEngine globalEngine = masterContext.getGlobalEngine(); - analyzer = globalEngine.getAnalyzer(); - logicalPlanner = globalEngine.getLogicalPlanner(); - logicalOptimizer = globalEngine.getLogicalOptimizer(); - } - - @AfterClass - public static void tearDown() throws Exception { - try { - Thread.sleep(2000); - } catch (Exception ignored) { - } - - testUtil.shutdown(); - } - - private NonForwardQueryResultScanner getScanner(String sql) throws Exception { - QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); - String sessionId = UUID.randomUUID().toString(); - - return getScanner(sql, queryId, sessionId); - } - - private NonForwardQueryResultScanner getScanner(String sql, QueryId queryId, String sessionId) throws Exception { - QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); - - Expr expr = analyzer.parse(sql); - LogicalPlan logicalPlan = logicalPlanner.createPlan(queryContext, expr); - logicalOptimizer.optimize(logicalPlan); - - int maxRow = Integer.MAX_VALUE; - if (logicalPlan.getRootBlock().hasNode(NodeType.LIMIT)) { - LimitNode limitNode = logicalPlan.getRootBlock().getNode(NodeType.LIMIT); - maxRow = (int) limitNode.getFetchFirstNum(); - } - - NonForwardQueryResultScanner queryResultScanner = - new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId, - sessionId, maxRow); - - return queryResultScanner; - } - - @Test - public void testInit() throws Exception { - QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); - String sessionId = UUID.randomUUID().toString(); - NonForwardQueryResultScanner queryResultScanner = - getScanner("SELECT SPACE_ID, SPACE_URI FROM INFORMATION_SCHEMA.TABLESPACE", - queryId, sessionId); - - queryResultScanner.init(); - - assertThat(queryResultScanner.getQueryId(), is(notNullValue())); - assertThat(queryResultScanner.getLogicalSchema(), is(notNullValue())); - assertThat(queryResultScanner.getSessionId(), is(notNullValue())); - assertThat(queryResultScanner.getTableDesc(), is(notNullValue())); - - assertThat(queryResultScanner.getQueryId(), is(queryId)); - assertThat(queryResultScanner.getSessionId(), is(sessionId)); - - assertThat(queryResultScanner.getLogicalSchema().size(), is(2)); - assertThat(queryResultScanner.getLogicalSchema().getColumn("space_id"), is(notNullValue())); - } - - private List<Tuple> getTupleList(RowStoreDecoder decoder, List<ByteString> bytes) { - List<Tuple> tuples = new ArrayList<Tuple>(bytes.size()); - - for (ByteString byteString: bytes) { - Tuple aTuple = decoder.toTuple(byteString.toByteArray()); - tuples.add(aTuple); - } - - return tuples; - } - - private <T> Matcher<Tuple> getTupleMatcher(final int fieldId, final Matcher<T> matcher) { - return new TypeSafeDiagnosingMatcher<Tuple>() { - - @Override - public void describeTo(Description description) { - description.appendDescriptionOf(matcher); - } - - @Override - protected boolean matchesSafely(Tuple item, Description mismatchDescription) { - Object itemValue = null; - - Type type = item.type(fieldId); - if (type == Type.TEXT) { - itemValue = item.getText(fieldId); - } else if (type == Type.INT4) { - itemValue = item.getInt4(fieldId); - } else if (type == Type.INT8) { - itemValue = item.getInt8(fieldId); - } - - if (itemValue != null && matcher.matches(itemValue)) { - return true; - } - - matcher.describeMismatch(itemValue, mismatchDescription); - return false; - } - }; - } - +public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase { @Test public void testGetNextRowsForAggregateFunction() throws Exception { - NonForwardQueryResultScanner queryResultScanner = - getScanner("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES"); - - queryResultScanner.init(); - - List<ByteString> rowBytes = queryResultScanner.getNextRows(100); - - assertThat(rowBytes.size(), is(1)); - - RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); - List<Tuple> tuples = getTupleList(decoder, rowBytes); - - assertThat(tuples.size(), is(1)); - assertThat(tuples, hasItem(getTupleMatcher(0, is(9L)))); + assertQueryStr("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES " + + "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'"); } - + @Test public void testGetNextRowsForTable() throws Exception { - NonForwardQueryResultScanner queryResultScanner = - getScanner("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES"); - - queryResultScanner.init(); - - List<ByteString> rowBytes = queryResultScanner.getNextRows(100); - - assertThat(rowBytes.size(), is(9)); - - RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); - List<Tuple> tuples = getTupleList(decoder, rowBytes);; - - assertThat(tuples.size(), is(9)); - assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem")))); + assertQueryStr("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES " + + "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'"); } - + @Test public void testGetClusterDetails() throws Exception { - NonForwardQueryResultScanner queryResultScanner = - getScanner("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER"); - - queryResultScanner.init(); - - List<ByteString> rowBytes = queryResultScanner.getNextRows(100); - - assertThat(rowBytes.size(), is(2)); - - RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); - List<Tuple> tuples = getTupleList(decoder, rowBytes); - - assertThat(tuples.size(), is(2)); - assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster")))); + assertQueryStr("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index edddc5a..1351716 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -45,7 +45,7 @@ import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.session.Session; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; @@ -105,7 +105,7 @@ public class TestKillQuery { Session session = LocalTajoTestingUtility.createDummySession(); CatalogService catalog = cluster.getMaster().getCatalog(); - LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); LogicalOptimizer optimizer = new LogicalOptimizer(conf); Expr expr = analyzer.parse(queryStr); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -169,7 +169,7 @@ public class TestKillQuery { Session session = LocalTajoTestingUtility.createDummySession(); CatalogService catalog = cluster.getMaster().getCatalog(); - LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); LogicalOptimizer optimizer = new LogicalOptimizer(conf); Expr expr = analyzer.parse(queryStr); LogicalPlan plan = planner.createPlan(defaultContext, expr); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java index 863c7b5..f48a71e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java +++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java @@ -67,7 +67,7 @@ public class TestRowFile { TableMeta meta = CatalogUtil.newTableMeta("ROWFILE"); - FileTablespace sm = (FileTablespace) TableSpaceManager.get(cluster.getDefaultFileSystem().getUri()).get(); + FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri()).get(); Path tablePath = new Path("/test"); Path metaPath = new Path(tablePath, ".meta"); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result new file mode 100644 index 0000000..45d730a --- /dev/null +++ b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result @@ -0,0 +1,4 @@ +aaa, a12, {"": "a34"}, 1 +bbb, b12, {"": "b34"}, 2 +ccc, c12, {"": "c34"}, 3 +ddd, d12, {"": "d34"}, 4 http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result new file mode 100644 index 0000000..9f12294 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result @@ -0,0 +1,4 @@ +type +------------------------------- +QueryMaster +Worker \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result new file mode 100644 index 0000000..07dd98b --- /dev/null +++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result @@ -0,0 +1,3 @@ +?count_2 +------------------------------- +3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result new file mode 100644 index 0000000..fd37504 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result @@ -0,0 +1,5 @@ +table_name,table_type +------------------------------- +customer,EXTERNAL +lineitem,EXTERNAL +nation,EXTERNAL \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java index 17f79da..22f4781 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java @@ -239,6 +239,14 @@ public class LogicalPlan { return queryBlocks.get(ROOT_BLOCK); } + public LogicalRootNode getRootNode() { + return queryBlocks.get(ROOT_BLOCK).getRoot(); + } + + public Schema getOutputSchema() { + return getRootNode().getOutSchema(); + } + public QueryBlock getBlock(String blockName) { return queryBlocks.get(blockName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 16ca368..441e047 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -910,6 +910,22 @@ public class PlannerUtil { } } + public static TableDesc getOutputTableDesc(LogicalPlan plan) { + LogicalNode [] found = findAllNodes(plan.getRootNode().getChild(), NodeType.CREATE_TABLE, NodeType.INSERT); + + if (found.length == 0) { + return new TableDesc(null, plan.getRootNode().getOutSchema(), "TEXT", new KeyValueSet(), null); + } else { + StoreTableNode storeNode = (StoreTableNode) found[0]; + return new TableDesc( + storeNode.getTableName(), + storeNode.getOutSchema(), + storeNode.getStorageType(), + storeNode.getOptions(), + storeNode.getUri()); + } + } + public static TableDesc getTableDesc(CatalogService catalog, LogicalNode node) throws IOException { if (node.getType() == NodeType.ROOT) { node = ((LogicalRootNode)node).getChild(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java index 0f0cd10..547a6f2 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java @@ -18,14 +18,46 @@ package org.apache.tajo.storage; +/** + * Format properties + */ public class FormatProperty { - private boolean sortedInsertRequired; - public FormatProperty(boolean sortedInsertRequired) { - this.sortedInsertRequired = sortedInsertRequired; + /** if this format supports insert operation */ + private boolean insertable; + /** if this format supports direct insertion (e.g., HBASE or JDBC-based storages) */ + private boolean directInsert; + /** if this format supports staging phase */ + private boolean stagingSupport; + + public FormatProperty(boolean insertable, boolean directInsert, boolean stagingSupport) { + this.insertable = insertable; + this.stagingSupport = stagingSupport; + this.directInsert = directInsert; + } + + /** + * Return if this format supports staging phase + * @return True if this format supports staging phase + */ + public boolean isInsertable() { + return insertable; + } + + /** + * Return if this format supports direct insertion (e.g., HBASE or JDBC-based storages) + * @return True if this format supports direct insertion + */ + public boolean directInsertSupported() { + return directInsert; } - public boolean sortedInsertRequired() { - return sortedInsertRequired; + /** + * Return if this format supports staging phase + * + * @return True if this format supports staging phase + */ + public boolean isStagingSupport() { + return stagingSupport; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java index ce573be..67a2f86 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -64,7 +64,7 @@ public class MergeScanner implements Scanner { long numBytes = 0; for (Fragment eachFileFragment: rawFragmentList) { - long fragmentLength = TableSpaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment); + long fragmentLength = TablespaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment); if (fragmentLength > 0) { numBytes += fragmentLength; fragments.add(eachFileFragment); @@ -131,7 +131,7 @@ public class MergeScanner implements Scanner { private Scanner getNextScanner() throws IOException { if (iterator.hasNext()) { currentFragment = iterator.next(); - currentScanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target); + currentScanner = TablespaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target); currentScanner.init(); return currentScanner; } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java index 12b236f..ef33a8e 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java @@ -31,7 +31,6 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.FileUtil; import java.io.IOException; import java.lang.reflect.Constructor; @@ -158,7 +157,7 @@ public class OldStorageManager { Constructor<? extends Tablespace> constructor = (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass); if (constructor == null) { - constructor = storageManagerClass.getDeclaredConstructor(TableSpaceManager.TABLESPACE_PARAM); + constructor = storageManagerClass.getDeclaredConstructor(TablespaceManager.TABLESPACE_PARAM); constructor.setAccessible(true); CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java index 38d0734..c1db34e 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java @@ -18,20 +18,39 @@ package org.apache.tajo.storage; +/** + * Storage Properties + */ public class StorageProperty { - private boolean movable; - private boolean writable; - private boolean insertable; - private boolean absolutePathAllowed; + /** default file format */ + private final String defaultFormat; + /** if this storage supports move operator */ + private final boolean movable; + /** if this storage supports is writable */ + private final boolean writable; + /** if this storage allows use of artibrary paths */ + private final boolean absolutePathAllowed; + + public StorageProperty(String defaultFormat, + boolean movable, + boolean writable, + boolean absolutePathAllowed) { - public StorageProperty(boolean movable, boolean writable, boolean isInsertable, boolean absolutePathAllowed) { + this.defaultFormat = defaultFormat; this.movable = movable; this.writable = writable; - this.insertable = isInsertable; this.absolutePathAllowed = absolutePathAllowed; } /** + * Return default file format + * @return Default file format + */ + public String defaultFormat() { + return defaultFormat; + } + + /** * Move-like operation is allowed * * @return true if move operation is available @@ -50,18 +69,9 @@ public class StorageProperty { } /** - * this storage supports insert operation? - * - * @return true if insert operation is allowed. - */ - public boolean isInsertable() { - return insertable; - } - - /** * Does this storage allows the use of arbitrary absolute paths outside tablespace? * - * @return + * @return True if this storage allows accesses to artibrary paths. */ public boolean isArbitraryPathAllowed() { return this.absolutePathAllowed; http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java deleted file mode 100644 index ef04509..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import net.minidev.json.JSONObject; -import net.minidev.json.parser.JSONParser; -import net.minidev.json.parser.ParseException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.Pair; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.URI; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - -import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI; - -/** - * It handles available table spaces and cache TableSpace instances. - * - * Default tablespace must be a filesystem-based one. - * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode. - * Local file system can be a default tablespace if a Tajo cluster runs on a single machine. - */ -public class TableSpaceManager implements StorageService { - private static final Log LOG = LogFactory.getLog(TableSpaceManager.class); - - public static final String DEFAULT_CONFIG_FILE = "storage-default.json"; - public static final String SITE_CONFIG_FILE = "storage-site.json"; - - /** default tablespace name */ - public static final String DEFAULT_TABLESPACE_NAME = "default"; - - private final static TajoConf systemConf = new TajoConf(); - private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR); - - // The relation ship among name, URI, Tablespaces must be kept 1:1:1. - protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap(); - protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap(); - - protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap(); - protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap(); - - public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class}; - - static { - instance = new TableSpaceManager(); - } - /** - * Singleton instance - */ - private static final TableSpaceManager instance; - - private TableSpaceManager() { - initForDefaultConfig(); // loading storage-default.json - initSiteConfig(); // storage-site.json will override the configs of storage-default.json - addWarehouseAsSpace(); // adding a warehouse directory for a default tablespace - addLocalFsTablespace(); // adding a tablespace using local file system by default - } - - private void addWarehouseAsSpace() { - Path warehouseDir = TajoConf.getWarehouseDir(systemConf); - registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false); - } - - private void addLocalFsTablespace() { - if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) { - String tmpName = UUID.randomUUID().toString(); - registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false); - } - } - - public static TableSpaceManager getInstance() { - return instance; - } - - private void initForDefaultConfig() { - JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE); - if (json == null) { - throw new IllegalStateException("There is no " + SITE_CONFIG_FILE); - } - applyConfig(json, false); - } - - private void initSiteConfig() { - JSONObject json = loadFromConfig(SITE_CONFIG_FILE); - - // if there is no storage-site.json file, nothing happen. - if (json != null) { - applyConfig(json, true); - } - } - - private JSONObject loadFromConfig(String fileName) { - String json; - try { - json = FileUtil.readTextFileFromResource(fileName); - } catch (IOException e) { - throw new RuntimeException(e); - } - - if (json != null) { - return parseJson(json); - } else { - return null; - } - } - - private static JSONObject parseJson(String json) { - try { - return (JSONObject) parser.parse(json); - } catch (ParseException e) { - throw new RuntimeException(e); - } - } - - private void applyConfig(JSONObject json, boolean override) { - loadStorages(json); - loadTableSpaces(json, override); - } - - private void loadStorages(JSONObject json) { - JSONObject spaces = (JSONObject) json.get(KEY_STORAGES); - - if (spaces != null) { - Pair<String, Class<? extends Tablespace>> pair = null; - for (Map.Entry<String, Object> entry : spaces.entrySet()) { - - try { - pair = extractStorage(entry); - } catch (ClassNotFoundException e) { - LOG.warn(e); - continue; - } - - TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond()); - } - } - } - - private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry) - throws ClassNotFoundException { - - String storageType = entry.getKey(); - JSONObject storageDesc = (JSONObject) entry.getValue(); - String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER); - - return new Pair<String, Class<? extends Tablespace>>( - storageType,(Class<? extends Tablespace>) Class.forName(handlerClass)); - } - - private void loadTableSpaces(JSONObject json, boolean override) { - JSONObject spaces = (JSONObject) json.get(KEY_SPACES); - - if (spaces != null) { - for (Map.Entry<String, Object> entry : spaces.entrySet()) { - AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override); - } - } - } - - public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) { - boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default")); - URI spaceUri = URI.create(spaceDesc.getAsString("uri")); - - if (defaultSpace) { - registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override); - } - registerTableSpace(spaceName, spaceUri, spaceDesc, true, override); - } - - private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, - boolean visible, boolean override) { - Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible); - tableSpace.setVisible(visible); - - try { - tableSpace.init(systemConf); - } catch (IOException e) { - throw new RuntimeException(e); - } - - putTablespace(tableSpace, override); - - // If the arbitrary path is allowed, root uri is also added as a tablespace - if (tableSpace.getProperty().isArbitraryPathAllowed()) { - URI rootUri = tableSpace.getRootUri(); - // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace. - if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) { - String tmpName = UUID.randomUUID().toString(); - registerTableSpace(tmpName, rootUri, spaceDesc, false, override); - } - } - } - - private static void putTablespace(Tablespace space, boolean override) { - // It is a device to keep the relationship among name, URI, and tablespace 1:1:1. - - boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName()); - boolean uriExist = TABLE_SPACES.containsKey(space.uri); - - boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri()); - mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space); - - if (!override && mismatch) { - throw new RuntimeException("Name or URI of Tablespace must be unique."); - } - - SPACES_URIS_MAP.put(space.getName(), space.getUri()); - // We must guarantee that the same uri results in the same tablespace instance. - TABLE_SPACES.put(space.getUri(), space); - } - - /** - * Return length of the fragment. - * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. - * - * @param conf Tajo system property - * @param fragment Fragment - * @return - */ - public static long guessFragmentVolume(TajoConf conf, Fragment fragment) { - if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { - return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); - } else { - return fragment.getLength(); - } - } - - public static final String KEY_STORAGES = "storages"; // storages - public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler - public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format - - public static final String KEY_SPACES = "spaces"; - - private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) { - Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri); - Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme()); - - if (clazz == null) { - throw new RuntimeException("There is no tablespace for " + uri.toString()); - } - - try { - Constructor<? extends Tablespace> constructor = - (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz); - - if (constructor == null) { - constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM); - constructor.setAccessible(true); - CONSTRUCTORS.put(clazz, constructor); - } - - return constructor.newInstance(new Object[]{spaceName, uri}); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @VisibleForTesting - public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) { - Tablespace existing; - synchronized (SPACES_URIS_MAP) { - // Remove existing one - SPACES_URIS_MAP.remove(space.getName()); - existing = TABLE_SPACES.remove(space.getUri()); - - // Add anotherone for test - registerTableSpace(space.name, space.uri, null, true, true); - } - // if there is an existing one, return it. - return Optional.fromNullable(existing); - } - - public Iterable<String> getSupportSchemes() { - return TABLE_SPACE_HANDLERS.keySet(); - } - - /** - * Get tablespace for the given URI. If uri is null, the default tablespace will be returned - * - * @param uri Table or Table Fragment URI. - * @param <T> Tablespace class type - * @return Tablespace. If uri is null, the default tablespace will be returned. - */ - public static <T extends Tablespace> Optional<T> get(@Nullable String uri) { - - if (uri == null || uri.isEmpty()) { - return (Optional<T>) Optional.of(getDefault()); - } - - Tablespace lastOne = null; - - // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and - // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific. - for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { - if (uri.startsWith(entry.getKey().toString())) { - lastOne = entry.getValue(); - } - } - return (Optional<T>) Optional.fromNullable(lastOne); - } - - /** - * Get tablespace for the given URI. If uri is null, the default tablespace will be returned - * - * @param uri Table or Table Fragment URI. - * @param <T> Tablespace class type - * @return Tablespace. If uri is null, the default tablespace will be returned. - */ - public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) { - if (uri == null) { - return (Optional<T>) Optional.of(getDefault()); - } else { - return (Optional<T>) get(uri.toString()); - } - } - - /** - * It returns the default tablespace. This method ensures that it always return the tablespace. - * - * @return - */ - public static <T extends Tablespace> T getDefault() { - return (T) getByName(DEFAULT_TABLESPACE_NAME).get(); - } - - public static <T extends Tablespace> T getLocalFs() { - return (T) get(LOCAL_FS_URI).get(); - } - - public static Optional<? extends Tablespace> getByName(String name) { - URI uri = SPACES_URIS_MAP.get(name); - if (uri != null) { - return Optional.of(TABLE_SPACES.get(uri)); - } else { - return Optional.absent(); - } - } - - public static Optional<? extends Tablespace> getAnyByScheme(String scheme) { - for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) { - String uriScheme = entry.getKey().getScheme(); - if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) { - return Optional.of(entry.getValue()); - } - } - - return Optional.absent(); - } - - @Override - public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) { - Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get(); - return space.getTableUri(databaseName, tableName); - } - - public static Iterable<Tablespace> getAllTablespaces() { - return TABLE_SPACES.values(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 77c5d05..52e223d 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -18,9 +18,11 @@ package org.apache.tajo.storage; +import com.google.common.base.Optional; import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryVars; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -149,7 +151,7 @@ public abstract class Tablespace { */ public abstract StorageProperty getProperty(); - public abstract FormatProperty getFormatProperty(String dataFormat); + public abstract FormatProperty getFormatProperty(TableMeta meta); /** * Release storage manager resource @@ -259,6 +261,14 @@ public abstract class Tablespace { return scanner; } + public Appender getAppenderForInsertRow(OverridableConf queryContext, + TaskAttemptId taskAttemptId, + TableMeta meta, + Schema schema, + Path workDir) throws IOException { + return getAppender(queryContext, taskAttemptId, meta, schema, workDir); + } + /** * Returns Appender instance. * @param queryContext Query property. @@ -395,4 +405,11 @@ public abstract class Tablespace { return false; } } + + public abstract URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException; + + public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, + TableMeta meta) throws IOException { + throw new IOException("Staging the output result is not supported in this storage"); + } }
