TAJO-1616: Implement TablespaceManager to load Tablespaces. Closes #602
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d0f37012 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d0f37012 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d0f37012 Branch: refs/heads/index_support Commit: d0f37012faf1a8a3259210287e9bc7762ce62001 Parents: 7c8477d Author: Hyunsik Choi <[email protected]> Authored: Wed Jun 24 14:33:34 2015 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Wed Jun 24 14:33:34 2015 -0700 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/algebra/CreateTable.java | 14 + .../org/apache/tajo/catalog/CatalogUtil.java | 7 + .../org/apache/tajo/catalog/DDLBuilder.java | 5 +- .../java/org/apache/tajo/catalog/TableDesc.java | 8 +- .../org/apache/tajo/catalog/TestTableDesc.java | 2 +- .../tajo/catalog/store/HiveCatalogStore.java | 2 +- .../catalog/store/TestHiveCatalogStore.java | 25 +- .../tajo/catalog/store/AbstractDBStore.java | 44 +- .../src/main/resources/schemas/derby/derby.xml | 2 +- .../main/resources/schemas/mariadb/mariadb.xml | 3 +- .../src/main/resources/schemas/mysql/mysql.xml | 3 +- .../main/resources/schemas/oracle/oracle.xml | 3 +- .../resources/schemas/postgresql/postgresql.xml | 4 +- .../cli/tsql/commands/DescTableCommand.java | 2 +- .../main/java/org/apache/tajo/QueryVars.java | 4 +- .../apache/tajo/storage/StorageConstants.java | 6 + .../org/apache/tajo/storage/StorageService.java | 37 ++ .../java/org/apache/tajo/util/FileUtil.java | 14 +- .../org/apache/tajo/util/ReflectionUtil.java | 6 +- .../org/apache/tajo/engine/parser/SQLParser.g4 | 8 +- .../apache/tajo/engine/parser/SQLAnalyzer.java | 11 +- .../engine/planner/PhysicalPlannerImpl.java | 14 +- .../engine/planner/global/GlobalPlanner.java | 4 +- .../planner/physical/BSTIndexScanExec.java | 2 +- .../planner/physical/ColPartitionStoreExec.java | 2 +- .../planner/physical/PhysicalPlanUtil.java | 2 +- .../physical/RangeShuffleFileWriteExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 6 +- .../engine/planner/physical/StoreTableExec.java | 19 +- .../apache/tajo/engine/query/QueryContext.java | 67 ++- .../org/apache/tajo/master/GlobalEngine.java | 13 +- .../java/org/apache/tajo/master/TajoMaster.java | 22 +- .../tajo/master/TajoMasterClientService.java | 3 +- .../apache/tajo/master/exec/DDLExecutor.java | 58 ++- .../exec/NonForwardQueryResultFileScanner.java | 2 +- .../apache/tajo/master/exec/QueryExecutor.java | 238 +++++---- .../master/exec/prehook/CreateTableHook.java | 10 +- .../prehook/DistributedQueryHookManager.java | 2 +- .../master/exec/prehook/InsertIntoHook.java | 8 +- .../java/org/apache/tajo/querymaster/Query.java | 51 +- .../tajo/querymaster/QueryMasterTask.java | 72 ++- .../apache/tajo/querymaster/Repartitioner.java | 77 +-- .../java/org/apache/tajo/querymaster/Stage.java | 12 +- .../tajo/webapp/QueryExecutorServlet.java | 2 +- .../org/apache/tajo/worker/LegacyTaskImpl.java | 2 +- .../java/org/apache/tajo/worker/TajoWorker.java | 5 +- .../java/org/apache/tajo/worker/TaskImpl.java | 2 +- .../resources/webapps/admin/catalogview.jsp | 3 +- .../src/main/resources/webapps/admin/index.jsp | 23 +- .../org/apache/tajo/BackendTestingUtil.java | 2 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 4 +- .../org/apache/tajo/TajoTestingCluster.java | 35 +- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +- .../org/apache/tajo/client/TestTajoClient.java | 18 +- .../apache/tajo/engine/eval/ExprTestBase.java | 3 +- .../tajo/engine/eval/TestEvalTreeUtil.java | 3 +- .../engine/planner/TestLogicalOptimizer.java | 4 +- .../tajo/engine/planner/TestLogicalPlan.java | 3 +- .../tajo/engine/planner/TestLogicalPlanner.java | 92 ++-- .../tajo/engine/planner/TestPlannerUtil.java | 5 +- .../planner/physical/TestBNLJoinExec.java | 15 +- .../planner/physical/TestBSTIndexExec.java | 6 +- .../planner/physical/TestExternalSortExec.java | 6 +- .../physical/TestFullOuterHashJoinExec.java | 29 +- .../physical/TestFullOuterMergeJoinExec.java | 40 +- .../planner/physical/TestHashAntiJoinExec.java | 10 +- .../planner/physical/TestHashJoinExec.java | 14 +- .../planner/physical/TestHashSemiJoinExec.java | 14 +- .../physical/TestLeftOuterHashJoinExec.java | 31 +- .../planner/physical/TestMergeJoinExec.java | 10 +- .../engine/planner/physical/TestNLJoinExec.java | 14 +- .../planner/physical/TestPhysicalPlanner.java | 56 +-- .../physical/TestProgressExternalSortExec.java | 6 +- .../physical/TestRightOuterHashJoinExec.java | 20 +- .../physical/TestRightOuterMergeJoinExec.java | 36 +- .../engine/planner/physical/TestSortExec.java | 6 +- .../apache/tajo/engine/query/TestCTASQuery.java | 8 +- .../tajo/engine/query/TestCreateTable.java | 8 +- .../tajo/engine/query/TestHBaseTable.java | 226 +++++---- .../tajo/engine/query/TestInsertQuery.java | 6 +- .../apache/tajo/engine/query/TestJoinQuery.java | 6 +- .../tajo/engine/query/TestTablePartitions.java | 28 +- .../org/apache/tajo/jdbc/TestResultSet.java | 4 +- .../tajo/master/TestExecutionBlockCursor.java | 10 +- .../apache/tajo/querymaster/TestKillQuery.java | 5 +- .../org/apache/tajo/storage/TestRowFile.java | 5 +- .../results/TestHBaseTable/testCATS.result | 100 ---- .../results/TestHBaseTable/testCTAS.result | 100 ++++ .../testInsertIntoUsingPut.result | 4 +- .../results/TestTajoCli/testDescTable.result | 8 +- .../testDescTableForNestedSchema.result | 4 +- .../src/main/conf/storage-site.json.template | 35 ++ tajo-docs/src/main/sphinx/index.rst | 1 + tajo-docs/src/main/sphinx/storage_plugin.rst | 47 ++ tajo-docs/src/main/sphinx/table_management.rst | 1 + .../sphinx/table_management/table_overview.rst | 7 + .../sphinx/table_management/tablespaces.rst | 45 ++ .../org/apache/tajo/plan/LogicalPlanner.java | 64 ++- .../tajo/plan/logical/CreateTableNode.java | 44 +- .../apache/tajo/plan/logical/InsertNode.java | 51 +- .../plan/logical/PartitionedTableScanNode.java | 2 +- .../org/apache/tajo/plan/logical/ScanNode.java | 2 +- .../tajo/plan/logical/StoreTableNode.java | 27 ++ .../rewrite/rules/PartitionedTableRewriter.java | 4 +- .../plan/serder/LogicalNodeDeserializer.java | 17 +- .../tajo/plan/serder/LogicalNodeSerializer.java | 25 +- .../org/apache/tajo/plan/util/PlannerUtil.java | 14 +- tajo-plan/src/main/proto/Plan.proto | 11 +- tajo-storage/tajo-storage-common/pom.xml | 12 +- .../org/apache/tajo/storage/FormatProperty.java | 31 ++ .../org/apache/tajo/storage/MergeScanner.java | 5 +- .../apache/tajo/storage/OldStorageManager.java | 251 ++++++++++ .../apache/tajo/storage/StorageProperty.java | 49 +- .../org/apache/tajo/storage/StorageUtil.java | 4 +- .../apache/tajo/storage/TableSpaceManager.java | 480 ++++++++++++------- .../org/apache/tajo/storage/Tablespace.java | 194 +++++--- .../src/main/resources/storage-default.json | 20 + tajo-storage/tajo-storage-hbase/pom.xml | 11 + .../storage/hbase/AbstractHBaseAppender.java | 2 +- .../storage/hbase/AddSortForInsertRewriter.java | 91 ---- .../tajo/storage/hbase/ColumnMapping.java | 17 +- .../tajo/storage/hbase/HBaseFragment.java | 28 +- .../tajo/storage/hbase/HBasePutAppender.java | 11 +- .../apache/tajo/storage/hbase/HBaseScanner.java | 9 +- .../tajo/storage/hbase/HBaseTablespace.java | 209 ++++---- .../storage/hbase/SortedInsertRewriter.java | 116 +++++ .../src/main/proto/StorageFragmentProtos.proto | 15 +- .../tajo/storage/hbase/TestColumnMapping.java | 2 +- .../storage/hbase/TestHBaseStorageManager.java | 108 ----- .../tajo/storage/hbase/TestHBaseTableSpace.java | 134 ++++++ tajo-storage/tajo-storage-hdfs/pom.xml | 6 +- .../org/apache/tajo/storage/FileAppender.java | 28 +- .../org/apache/tajo/storage/FileTablespace.java | 137 ++++-- .../storage/HashShuffleAppenderManager.java | 5 +- .../tajo/storage/TestCompressionStorages.java | 4 +- .../tajo/storage/TestDelimitedTextFile.java | 8 +- .../tajo/storage/TestFileStorageManager.java | 233 --------- .../apache/tajo/storage/TestFileSystems.java | 2 +- .../apache/tajo/storage/TestFileTablespace.java | 250 ++++++++++ .../org/apache/tajo/storage/TestLineReader.java | 8 +- .../apache/tajo/storage/TestMergeScanner.java | 6 +- .../org/apache/tajo/storage/TestStorages.java | 48 +- .../apache/tajo/storage/index/TestBSTIndex.java | 58 ++- .../index/TestSingleCSVFileBSTIndex.java | 4 +- .../apache/tajo/storage/json/TestJsonSerDe.java | 2 +- 146 files changed, 2966 insertions(+), 1972 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1f69ac0..75ffd39 100644 --- a/CHANGES +++ b/CHANGES @@ -350,6 +350,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1616: Implement TablespaceManager to load Tablespaces. (hyunsik) + TAJO-1615: Implement TaskManager. (jinho) TAJO-1599: Implement NodeResourceManager and Status updater. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java ---------------------------------------------------------------------- diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java index 2d4a241..5d1599d 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java @@ -38,6 +38,8 @@ public class CreateTable extends Expr { private String tableName; @Expose @SerializedName("Attributes") private ColumnDefinition [] tableElements; + @Expose @SerializedName("SpaceName") + private String spaceName; @Expose @SerializedName("StorageType") private String storageType; @Expose @SerializedName("Location") @@ -100,6 +102,18 @@ public class CreateTable extends Expr { this.tableElements = tableElements; } + public boolean hasTableSpaceName() { + return spaceName != null; + } + + public void setTableSpaceName(String spaceName) { + this.spaceName = spaceName; + } + + public String getTableSpaceName() { + return spaceName; + } + public boolean hasStorageType() { return storageType != null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index a2e4a9d..6c6915b 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -18,6 +18,7 @@ package org.apache.tajo.catalog; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.hadoop.fs.Path; import org.apache.tajo.DataTypeUtil; @@ -31,6 +32,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.TUtil; @@ -227,6 +229,11 @@ public class CatalogUtil { return sb.toString(); } + public static Pair<String, String> separateQualifierAndName(String name) { + Preconditions.checkArgument(isFQTableName(name), "Must be a qualified name."); + return new Pair<String, String>(extractQualifier(name), extractSimpleName(name)); + } + /** * Extract a qualification name from an identifier. * http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java index 65640b9..62dd894 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java @@ -19,7 +19,6 @@ package org.apache.tajo.catalog; import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.util.KeyValueSet; import java.util.Map; @@ -32,7 +31,7 @@ public class DDLBuilder { sb.append("--\n") .append("-- Name: ").append(CatalogUtil.denormalizeIdentifier(desc.getName())).append("; Type: TABLE;") .append(" Storage: ").append(desc.getMeta().getStoreType()); - sb.append("\n-- Path: ").append(desc.getPath()); + sb.append("\n-- Path: ").append(desc.getUri()); sb.append("\n--\n"); sb.append("CREATE EXTERNAL TABLE ").append(CatalogUtil.denormalizeIdentifier(desc.getName())); buildSchema(sb, desc.getSchema()); @@ -109,7 +108,7 @@ public class DDLBuilder { } private static void buildLocationClause(StringBuilder sb, TableDesc desc) { - sb.append(" LOCATION '").append(desc.getPath()).append("'"); + sb.append(" LOCATION '").append(desc.getUri()).append("'"); } private static void buildPartitionClause(StringBuilder sb, TableDesc desc) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java index 17f9146..4700322 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java @@ -89,12 +89,16 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone public String getName() { return this.tableName; } + + public boolean hasUri() { + return this.uri != null; + } - public void setPath(URI uri) { + public void setUri(URI uri) { this.uri = uri; } - public URI getPath() { + public URI getUri() { return this.uri; } http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java index 9d84de6..41a0832 100644 --- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java @@ -78,7 +78,7 @@ public class TestTableDesc { TableDesc desc = new TableDesc("table1", schema, info, path.toUri()); assertEquals("table1", desc.getName()); - assertEquals(path.toUri(), desc.getPath()); + assertEquals(path.toUri(), desc.getUri()); assertEquals(info, desc.getMeta()); testClone(desc); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index 3ea263a..8f23db4 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -443,7 +443,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { table.setTableType(TableType.EXTERNAL_TABLE.name()); table.putToParameters("EXTERNAL", "TRUE"); - Path tablePath = new Path(tableDesc.getPath()); + Path tablePath = new Path(tableDesc.getUri()); FileSystem fs = tablePath.getFileSystem(conf); if (fs.isFile(tablePath)) { LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path."); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index 6e91b89..946d271 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -43,7 +43,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; -import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; /** @@ -105,7 +104,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -134,7 +133,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -163,7 +162,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -197,7 +196,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -247,7 +246,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, NATION)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -261,8 +260,8 @@ public class TestHiveCatalogStore { assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName()); } - testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101"); - testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102"); + testAddPartition(table1.getUri(), NATION, "n_nationkey=10/n_date=20150101"); + testAddPartition(table1.getUri(), NATION, "n_nationkey=20/n_date=20150102"); testDropPartition(NATION, "n_nationkey=10/n_date=20150101"); testDropPartition(NATION, "n_nationkey=20/n_date=20150102"); @@ -370,7 +369,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName)); FileSystem fs = FileSystem.getLocal(new Configuration()); - assertTrue(fs.exists(new Path(table1.getPath()))); + assertTrue(fs.exists(new Path(table1.getUri()))); store.dropTable(DB_NAME, tableName); assertFalse(store.existTable(DB_NAME, tableName)); @@ -395,7 +394,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -424,7 +423,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -456,7 +455,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); @@ -491,7 +490,7 @@ public class TestHiveCatalogStore { TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName)); assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); + assertEquals(table.getUri(), table1.getUri()); assertEquals(table.getSchema().size(), table1.getSchema().size()); for (int i = 0; i < table.getSchema().size(); i++) { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 34740c0..043c8bc 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -761,36 +761,25 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo int dbid = getDatabaseId(databaseName); - if (table.getIsExternal()) { - String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + ", TABLE_TYPE, PATH, STORE_TYPE) VALUES(?, ?, ?, ?, ?) "; + String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + + ", TABLE_TYPE, PATH, STORE_TYPE) VALUES(?, ?, ?, ?, ?) "; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } - pstmt = conn.prepareStatement(sql); - pstmt.setInt(1, dbid); - pstmt.setString(2, tableName); + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, dbid); + pstmt.setString(2, tableName); + if (table.getIsExternal()) { pstmt.setString(3, TableType.EXTERNAL_TABLE.name()); - pstmt.setString(4, table.getPath()); - pstmt.setString(5, table.getMeta().getStoreType()); - pstmt.executeUpdate(); - pstmt.close(); } else { - String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + ", TABLE_TYPE, STORE_TYPE) VALUES(?, ?, ?, ?) "; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - pstmt = conn.prepareStatement(sql); - pstmt.setInt(1, dbid); - pstmt.setString(2, tableName); pstmt.setString(3, TableType.BASE_TABLE.name()); - pstmt.setString(4, table.getMeta().getStoreType()); - pstmt.executeUpdate(); - pstmt.close(); } + pstmt.setString(4, table.getPath()); + pstmt.setString(5, table.getMeta().getStoreType()); + pstmt.executeUpdate(); + pstmt.close(); String tidSql = "SELECT TID from " + TB_TABLES + " WHERE " + COL_DATABASES_PK + "=? AND " + COL_TABLES_NAME + "=?"; @@ -1612,12 +1601,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo tableBuilder.setIsExternal(true); } - if (tableType == TableType.BASE_TABLE) { - tableBuilder.setPath(databaseIdAndUri.getSecond() + "/" + tableName); - } else { - tableBuilder.setPath(res.getString(4).trim()); - } - + tableBuilder.setPath(res.getString(4).trim()); storeType = res.getString(5).trim(); res.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index a4ff00f..e0bd469 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -24,7 +24,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="4"> + <tns:base version="5"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml index 79ccd0a..7485da1 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml @@ -19,12 +19,13 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) * 3 - 2015-03-12: Nested Schema (TAJO-1329) * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="4"> + <tns:base version="5"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml index 34337fb..2bde04f 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml @@ -19,12 +19,13 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) * 3 - 2015-03-12: Nested Schema (TAJO-1329) * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="4"> + <tns:base version="5"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml index 323e22c..2778e0c 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml @@ -19,12 +19,13 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) * 3 - 2015-03-12: Nested Schema (TAJO-1329) * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="4"> + <tns:base version="5"> <tns:objects> <tns:Object order="0" type="table" name="meta"> <tns:sql><![CDATA[ http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml index 554acd5..0051242 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml @@ -21,12 +21,14 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) + * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) * 3 - 2015-03-12: Nested Schema (TAJO-1329) * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="4"> + <tns:base version="5"> <tns:objects> <tns:Object name="META" type="table" order="0"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java index 6df26b7..6b2905a 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java @@ -78,7 +78,7 @@ public class DescTableCommand extends TajoShellCommand { protected String toFormattedString(TableDesc desc) { StringBuilder sb = new StringBuilder(); sb.append("\ntable name: ").append(desc.getName()).append("\n"); - sb.append("table path: ").append(desc.getPath()).append("\n"); + sb.append("table uri: ").append(desc.getUri()).append("\n"); sb.append("store type: ").append(desc.getMeta().getStoreType()).append("\n"); if (desc.getStats() != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/QueryVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java index 55ca700..a6d5d1d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java @@ -22,9 +22,11 @@ import org.apache.tajo.validation.Validator; public enum QueryVars implements ConfigKey { COMMAND_TYPE, + DEFAULT_SPACE_URI, + DEFAULT_SPACE_ROOT_URI, STAGING_DIR, OUTPUT_TABLE_NAME, - OUTPUT_TABLE_PATH, + OUTPUT_TABLE_URI, OUTPUT_PARTITIONS, OUTPUT_OVERWRITE, OUTPUT_AS_DIRECTORY, http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index d2c6c1c..a9923a5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -20,8 +20,14 @@ package org.apache.tajo.storage; import org.apache.tajo.TajoConstants; +import java.net.URI; + public class StorageConstants { + // Tablespace ------------------------------------------------- + + public static final URI LOCAL_FS_URI = URI.create("file:/"); + // Common table properties ------------------------------------------------- // time zone http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java new file mode 100644 index 0000000..1057097 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import javax.annotation.Nullable; +import java.net.URI; + +/** + * TablespaceManager interface for loosely coupled usages + */ +public interface StorageService { + /** + * Get Table URI + * + * @param spaceName Tablespace name. If it is null, the default space will be used + * @param databaseName Database name + * @param tableName Table name + * @return Table URI + */ + URI getTableURI(@Nullable String spaceName, String databaseName, String tableName); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java index 3e3d3a2..39f4c29 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java @@ -87,8 +87,20 @@ public class FileUtil { return ClassLoader.getSystemResource(resource); } + /** + * It returns a string from a text file found in classpath. + * + * @param resource Resource file name + * @return String contents if exists. Otherwise, it will return null. + * @throws IOException + */ public static String readTextFileFromResource(String resource) throws IOException { - return readTextFromStream(ClassLoader.getSystemResourceAsStream(resource)); + InputStream stream = ClassLoader.getSystemResourceAsStream(resource); + if (stream != null) { + return readTextFromStream(stream); + } else { + return null; + } } public static String readTextFromStream(InputStream inputStream) http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java index e2def69..5a712c0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java @@ -25,9 +25,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ReflectionUtil { - private static final Class<?>[] EMPTY_PARAM = new Class[]{}; - private static final Object [] EMPTY_OBJECT = new Object[] {}; - private static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class}; + public static final Class<?>[] EMPTY_PARAM = new Class[]{}; + public static final Object [] EMPTY_OBJECT = new Object[] {}; + public static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class}; /** * Caches of constructors for each class. Pins the classes so they http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 index 3ab11bd..469b2a2 100644 --- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 +++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 @@ -98,11 +98,11 @@ if_exists ; create_table_statement - : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING storage_type=identifier - (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)? - | CREATE TABLE (if_not_exists)? table_name table_elements (USING storage_type=identifier)? + : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements (TABLESPACE spacename=identifier)? USING storage_type=identifier + (param_clause)? (table_partitioning_clauses)? (LOCATION uri=Character_String_Literal)? + | CREATE TABLE (if_not_exists)? table_name table_elements (TABLESPACE spacename=identifier)? (USING storage_type=identifier)? (param_clause)? (table_partitioning_clauses)? (AS query_expression)? - | CREATE TABLE (if_not_exists)? table_name (USING storage_type=identifier)? + | CREATE TABLE (if_not_exists)? table_name (TABLESPACE spacename=identifier)? (USING storage_type=identifier)? (param_clause)? (table_partitioning_clauses)? AS query_expression | CREATE TABLE (if_not_exists)? table_name LIKE like_table_name=table_name ; http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index 7c99868..62bb0f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -1252,9 +1252,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { createTable.setTableElements(elements); createTable.setStorageType(storageType); - if (PlannerUtil.isFileStorageType(storageType)) { - String path = stripQuote(ctx.path.getText()); - createTable.setLocation(path); + if (checkIfExist(ctx.LOCATION())) { + String uri = stripQuote(ctx.uri.getText()); + createTable.setLocation(uri); } } else { if (checkIfExist(ctx.table_elements())) { @@ -1262,6 +1262,11 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { createTable.setTableElements(elements); } + if (checkIfExist(ctx.TABLESPACE())) { + String spaceName = ctx.spacename.getText(); + createTable.setTableSpaceName(spaceName); + } + if (checkIfExist(ctx.USING())) { String fileType = ctx.storage_type.getText(); createTable.setStorageType(fileType); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 30cb24f..f0b2f5e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -251,7 +252,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { FragmentProto[] fragmentProtos = ctx.getTables(tableId); List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos); for (Fragment frag : fragments) { - size += Tablespace.getFragmentLength(ctx.getConf(), frag); + size += TableSpaceManager.guessFragmentVolume(ctx.getConf(), frag); } } return size; @@ -924,9 +925,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { if (broadcastFlag) { PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; List<Fragment> fileFragments = TUtil.newList(); - FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(ctx.getConf()); + + FileTablespace space = (FileTablespace) TableSpaceManager.get(scanNode.getTableDesc().getUri()).get(); for (Path path : partitionedTableScanNode.getInputPaths()) { - fileFragments.addAll(TUtil.newList(fileStorageManager.split(scanNode.getCanonicalName(), path))); + fileFragments.addAll(TUtil.newList(space.split(scanNode.getCanonicalName(), path))); } FragmentProto[] fragments = @@ -1188,8 +1190,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { FragmentConvertor.convert(ctx.getConf(), fragmentProtos); String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys()); - FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(ctx.getConf()); - Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index"); + FileTablespace sm = (FileTablespace) TableSpaceManager.get(fragments.get(0).getPath().toUri()).get(); + String dbName = CatalogUtil.extractQualifier(annotation.getTableName()); + String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName()); + Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index"); TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(), annotation.getSortKeys()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 89e887a..ba4833b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -48,6 +48,7 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ReflectionUtil; import org.apache.tajo.util.TUtil; @@ -170,7 +171,8 @@ public class GlobalPlanner { "Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() + ") is not initialized"); TableMeta meta = new TableMeta(channel.getStoreType(), new KeyValueSet()); - TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/").toUri()); + TableDesc desc = new TableDesc( + channel.getSrcId().toString(), channel.getSchema(), meta, StorageConstants.LOCAL_FS_URI); ScanNode scanNode = plan.createNode(ScanNode.class); scanNode.init(desc); return scanNode; http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index bc6975a..54abca8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -56,7 +56,7 @@ public class BSTIndexScanExec extends PhysicalExec { this.qual = scanNode.getQual(); this.datum = datum; - this.fileScanner = TableSpaceManager.getSeekableScanner(context.getConf(), + this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(), scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema); this.fileScanner.init(); this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 3121671..969998c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -165,7 +165,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { actualFilePath = new Path(lastFileName + "_" + suffixId); } - appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf())) + appender = ((FileTablespace) TableSpaceManager.get(lastFileName.toUri()).get()) .getAppender(meta, outSchema, actualFilePath); appender.enableStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index d240edb..deda498 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -74,7 +74,7 @@ public class PhysicalPlanUtil { */ public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc, int fileIndex, int numResultFiles) throws IOException { - Path path = new Path(tableDesc.getPath()); + Path path = new Path(tableDesc.getUri()); FileSystem fs = path.getFileSystem(tajoConf); //In the case of partitioned table, we should return same partition key data files. http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 3dd1cd9..fb29e4f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -77,7 +77,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { context.getDataChannel().getStoreType() : "RAW"); FileSystem fs = new RawLocalFileSystem(); fs.mkdirs(storeTablePath); - this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf())) + this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getDefault()) .getAppender(meta, outSchema, new Path(storeTablePath, "output")); this.appender.enableStats(); this.appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index b01af6c..d2ae3bd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -202,10 +202,8 @@ public class SeqScanExec extends ScanExec { FragmentConvertor.convert(context.getConf(), fragments), projected ); } else { - Tablespace tablespace = TableSpaceManager.getStorageManager( - context.getConf(), plan.getTableDesc().getMeta().getStoreType()); - this.scanner = tablespace.getScanner(meta, - plan.getPhysicalSchema(), fragments[0], projected); + Tablespace tablespace = TableSpaceManager.get(plan.getTableDesc().getUri()).get(); + this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected); } scanner.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 5b17eee..dd8768e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -35,6 +36,7 @@ import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.net.URI; /** * This is a physical executor to store a table part into a specified storage. @@ -90,17 +92,26 @@ public class StoreTableExec extends UnaryPhysicalExec { lastFileName = new Path(lastFileName + "_" + suffixId); } - appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf())) - .getAppender(meta, appenderSchema, lastFileName); + Optional<FileTablespace> spaceRes = TableSpaceManager.get(lastFileName.toUri()); + if (!spaceRes.isPresent()) { + throw new IllegalStateException("No Tablespace for " + lastFileName.toUri()); + } + + FileTablespace space = spaceRes.get(); + appender = space.getAppender(meta, appenderSchema, lastFileName); if (suffixId > 0) { LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + "The remain output will be written into " + lastFileName.toString()); } } else { - appender = TableSpaceManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender( + Path stagingDir = context.getQueryContext().getStagingDir(); + appender = TableSpaceManager.get(stagingDir.toUri()).get().getAppender( context.getQueryContext(), - context.getTaskId(), meta, appenderSchema, context.getQueryContext().getStagingDir()); + context.getTaskId(), + meta, + appenderSchema, + stagingDir); } appender.enableStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index ee50221..7696c6c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -28,6 +28,8 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.session.Session; +import java.net.URI; + import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; /** @@ -63,6 +65,40 @@ public class QueryContext extends OverridableConf { return get(SessionVars.USERNAME); } + /** + * Set the default tablespace uri + * + * @param uri The default tablespace uri + */ + public void setDefaultSpaceUri(URI uri) { + put(QueryVars.DEFAULT_SPACE_URI, uri.toString()); + } + + /** + * Return the default tablespace uri + */ + public URI getDefaultSpaceUri() { + String strVal = get(QueryVars.DEFAULT_SPACE_URI, ""); + return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null; + } + + /** + * Set the root uri of the default tablespace + * + * @param uri The root uri of the default tablespace + */ + public void setDefaultSpaceRootUri(URI uri) { + put(QueryVars.DEFAULT_SPACE_ROOT_URI, uri.toString()); + } + + /** + * Return the root of the default tablespace + */ + public URI getDefaultSpaceRootUri() { + String strVal = get(QueryVars.DEFAULT_SPACE_ROOT_URI, ""); + return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null; + } + public void setStagingDir(Path path) { put(QueryVars.STAGING_DIR, path.toUri().toString()); } @@ -82,24 +118,33 @@ public class QueryContext extends OverridableConf { } /** - * The fact that QueryContext has an output path means this query will write the output to a specific directory. - * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO (<table name>|LOCATION)' statement. + * The final output table's uri. It will be set if a query is CTAS or INSERT (OVERWRITE) INTO statement * - * @return + * @return True if a output table uri is set. Otherwise, it will return false */ - public boolean hasOutputPath() { - return containsKey(QueryVars.OUTPUT_TABLE_PATH); + public boolean hasOutputTableUri() { + return containsKey(QueryVars.OUTPUT_TABLE_URI); } - public void setOutputPath(Path path) { - if (path != null) { - put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString()); + /** + * Set the final output table uri + * + * @param uri + */ + public void setOutputPath(URI uri) { + if (uri != null) { + put(QueryVars.OUTPUT_TABLE_URI, uri.toString()); } } - public Path getOutputPath() { - String strVal = get(QueryVars.OUTPUT_TABLE_PATH); - return strVal != null ? new Path(strVal) : null; + /** + * Get the final output table uri + * + * @return The final output table uri + */ + public URI getOutputTableUri() { + String strVal = get(QueryVars.OUTPUT_TABLE_URI); + return strVal != null ? URI.create(strVal) : null; } public boolean hasPartition() { http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 2cd585f..e833884 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -53,7 +53,6 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.CommonTestingUtil; @@ -68,7 +67,6 @@ public class GlobalEngine extends AbstractService { private final static Log LOG = LogFactory.getLog(GlobalEngine.class); private final MasterContext context; - private final Tablespace sm; private SQLAnalyzer analyzer; private CatalogService catalog; @@ -84,7 +82,6 @@ public class GlobalEngine extends AbstractService { super(GlobalEngine.class.getName()); this.context = context; this.catalog = context.getCatalog(); - this.sm = context.getStorageManager(); this.ddlExecutor = new DDLExecutor(context); this.queryExecutor = new QueryExecutor(context, ddlExecutor); @@ -94,7 +91,7 @@ public class GlobalEngine extends AbstractService { try { analyzer = new SQLAnalyzer(); preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); - planner = new LogicalPlanner(context.getCatalog()); + planner = new LogicalPlanner(context.getCatalog(), TableSpaceManager.getInstance()); optimizer = new LogicalOptimizer(context.getConf()); annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog()); } catch (Throwable t) { @@ -143,6 +140,10 @@ public class GlobalEngine extends AbstractService { private QueryContext createQueryContext(Session session) { QueryContext newQueryContext = new QueryContext(context.getConf(), session); + // Set default space uri and its root uri + newQueryContext.setDefaultSpaceUri(TableSpaceManager.getDefault().getUri()); + newQueryContext.setDefaultSpaceRootUri(TableSpaceManager.getDefault().getRootUri()); + String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY); if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) { newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest()); @@ -302,8 +303,8 @@ public class GlobalEngine extends AbstractService { InsertNode iNode = rootNode.getChild(); Schema outSchema = iNode.getChild().getOutSchema(); - TableSpaceManager.getStorageManager(queryContext.getConf(), storeType) - .verifyInsertTableSchema(tableDesc, outSchema); + TableSpaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema); + } catch (Throwable t) { state.addVerification(t.getMessage()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index c41fdde..e1e85dd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -21,7 +21,10 @@ package org.apache.tajo.master; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.service.CompositeService; @@ -53,8 +56,6 @@ import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.session.SessionManager; -import org.apache.tajo.storage.Tablespace; -import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.*; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; @@ -114,7 +115,6 @@ public class TajoMaster extends CompositeService { private CatalogServer catalogServer; private CatalogService catalog; - private Tablespace storeManager; private GlobalEngine globalEngine; private AsyncDispatcher dispatcher; private TajoMasterClientService tajoMasterClientService; @@ -183,7 +183,6 @@ public class TajoMaster extends CompositeService { // check the system directory and create if they are not created. checkAndInitializeSystemDirectories(); diagnoseTajoMaster(); - this.storeManager = TableSpaceManager.getFileStorageManager(systemConf); catalogServer = new CatalogServer(loadFunctions()); addIfService(catalogServer); @@ -211,7 +210,14 @@ public class TajoMaster extends CompositeService { throw e; } - super.serviceInit(systemConf); + // Try to start up all services in TajoMaster. + // If anyone is failed, the master prints out the errors and immediately should shutdowns + try { + super.serviceInit(systemConf); + } catch (Throwable t) { + t.printStackTrace(); + System.exit(1); + } LOG.info("Tajo Master is initialized."); } @@ -477,10 +483,6 @@ public class TajoMaster extends CompositeService { return globalEngine; } - public Tablespace getStorageManager() { - return storeManager; - } - public QueryCoordinatorService getTajoMasterService() { return tajoMasterService; } http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 31eecdc..7dbe815 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -844,8 +844,7 @@ public class TajoMasterClientService extends AbstractService { TableDesc desc; try { desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(), - meta.getStoreType(), schema, - meta, path, true, partitionDesc, false); + null, meta.getStoreType(), schema, meta, path.toUri(), true, partitionDesc, false); } catch (Exception e) { return TableResponse.newBuilder() .setResultCode(ResultCode.ERROR) http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index 93c950e..5e0e639 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -36,11 +36,12 @@ import org.apache.tajo.master.TajoMaster; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.storage.TableSpaceManager; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -54,12 +55,10 @@ public class DDLExecutor { private final TajoMaster.MasterContext context; private final CatalogService catalog; - private final Tablespace tablespace; public DDLExecutor(TajoMaster.MasterContext context) { this.context = context; this.catalog = context.getCatalog(); - this.tablespace = context.getStorageManager(); } public boolean execute(QueryContext queryContext, LogicalPlan plan) throws IOException { @@ -202,17 +201,31 @@ public class DDLExecutor { } if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){ - Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given."); + Preconditions.checkState(createTable.hasUri(), "ERROR: LOCATION must be given."); } - return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(), - createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(), - createTable.getPartitionMethod(), ifNotExists); + return createTable( + queryContext, + createTable.getTableName(), + createTable.getTableSpaceName(), + createTable.getStorageType(),createTable.getTableSchema(), + meta, + createTable.getUri(), + createTable.isExternal(), + createTable.getPartitionMethod(), + ifNotExists); } - public TableDesc createTable(QueryContext queryContext, String tableName, String storeType, - Schema schema, TableMeta meta, Path path, boolean isExternal, - PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException { + public TableDesc createTable(QueryContext queryContext, + String tableName, + @Nullable String tableSpaceName, + @Nullable String storeType, + Schema schema, + TableMeta meta, + @Nullable URI uri, + boolean isExternal, + @Nullable PartitionMethodDesc partitionDesc, + boolean ifNotExists) throws IOException { String databaseName; String simpleTableName; if (CatalogUtil.isFQTableName(tableName)) { @@ -232,18 +245,28 @@ public class DDLExecutor { LOG.info("relation \"" + qualifiedName + "\" is already exists." ); return catalog.getTableDesc(databaseName, simpleTableName); } else { - throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName)); + throw new AlreadyExistsTableException(qualifiedName); } } - TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName), - schema, meta, (path != null ? path.toUri(): null), isExternal); + Tablespace tableSpace; + if (tableSpaceName != null) { + tableSpace = TableSpaceManager.getByName(tableSpaceName).get(); + } else if (uri != null) { + tableSpace = TableSpaceManager.get(uri).get(); + } else { + tableSpace = TableSpaceManager.getDefault(); + } + + TableDesc desc; + URI tableUri = isExternal ? uri : tableSpace.getTableUri(databaseName, simpleTableName); + desc = new TableDesc(qualifiedName, schema, meta, tableUri, isExternal); if (partitionDesc != null) { desc.setPartitionMethod(partitionDesc); } - TableSpaceManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists); + tableSpace.createTable(desc, ifNotExists); if (catalog.createTable(desc)) { LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")"); @@ -290,8 +313,7 @@ public class DDLExecutor { if (purge) { try { - TableSpaceManager.getStorageManager(queryContext.getConf(), - tableDesc.getMeta().getStoreType()).purgeTable(tableDesc); + TableSpaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc); } catch (IOException e) { throw new InternalError(e.getMessage()); } @@ -330,7 +352,7 @@ public class DDLExecutor { Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName); TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName); - Path tablePath = new Path(tableDesc.getPath()); + Path tablePath = new Path(tableDesc.getUri()); if (tablePath.getParent() == null || !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) { throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath + @@ -340,7 +362,7 @@ public class DDLExecutor { } for (TableDesc eachTable: tableDescList) { - Path path = new Path(eachTable.getPath()); + Path path = new Path(eachTable.getUri()); LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path); FileSystem fs = path.getFileSystem(context.getConf()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 8f6c6f9..ae57453 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -101,7 +101,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc } private void initSeqScanExec() throws IOException { - Tablespace tablespace = TableSpaceManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()); + Tablespace tablespace = TableSpaceManager.get(tableDesc.getUri()).get(); List<Fragment> fragments = null; setPartition(tablespace); fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 281edad..480f45c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -43,24 +43,26 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; -import org.apache.tajo.master.*; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.QueryManager; +import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; -import org.apache.tajo.plan.expr.EvalContext; -import org.apache.tajo.plan.expr.GeneralFunctionEval; -import org.apache.tajo.plan.function.python.PythonScriptEngine; -import org.apache.tajo.plan.function.python.TajoScriptEngine; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.querymaster.*; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.Target; +import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.expr.GeneralFunctionEval; +import org.apache.tajo.plan.function.python.PythonScriptEngine; +import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; +import org.apache.tajo.querymaster.Query; +import org.apache.tajo.querymaster.QueryMasterTask; +import org.apache.tajo.session.Session; import org.apache.tajo.storage.*; import org.apache.tajo.util.ProtoUtil; import org.apache.tajo.worker.TaskAttemptContext; @@ -329,104 +331,108 @@ public class QueryExecutor { } private void insertNonFromQuery(QueryContext queryContext, - InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) - throws Exception { - String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName(); - String queryId = nodeUniqName + "_" + System.currentTimeMillis(); - - FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); - Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - - TableDesc tableDesc = null; - Path finalOutputDir = null; - if (insertNode.getTableName() != null) { - tableDesc = this.catalog.getTableDesc(insertNode.getTableName()); - finalOutputDir = new Path(tableDesc.getPath()); - } else { - finalOutputDir = insertNode.getPath(); - } - - TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir); - taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); - - EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); - StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec); + InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) { try { - exec.init(); - exec.next(); - } finally { - exec.close(); - } + String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() : + insertNode.getTableName(); + String queryId = nodeUniqName + "_" + System.currentTimeMillis(); + + FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); + Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + + TableDesc tableDesc = null; + Path finalOutputDir; + if (insertNode.getTableName() != null) { + tableDesc = this.catalog.getTableDesc(insertNode.getTableName()); + finalOutputDir = new Path(tableDesc.getUri()); + } else { + finalOutputDir = new Path(insertNode.getUri()); + } + + TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir); + taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); - if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO - // it moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); + StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec); try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); - } - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); - } + exec.init(); + exec.next(); + } finally { + exec.close(); } - } else { - FileStatus[] files = fs.listStatus(stagingResultDir); - for (FileStatus eachFile: files) { - Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName()); - if (fs.exists(targetFilePath)) { - targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis()); + + if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO + // it moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + try { + if (fs.exists(finalOutputDir)) { + fs.rename(finalOutputDir, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir.getParent()); + } + fs.rename(stagingResultDir, finalOutputDir); + committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputDir); + } + } + } else { + FileStatus[] files = fs.listStatus(stagingResultDir); + for (FileStatus eachFile : files) { + Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName()); + if (fs.exists(targetFilePath)) { + targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis()); + } + fs.rename(eachFile.getPath(), targetFilePath); } - fs.rename(eachFile.getPath(), targetFilePath); } - } - if (insertNode.hasTargetTable()) { - TableStats stats = tableDesc.getStats(); - long volume = Query.getTableVolume(context.getConf(), finalOutputDir); - stats.setNumBytes(volume); - stats.setNumRows(1); + if (insertNode.hasTargetTable()) { + TableStats stats = tableDesc.getStats(); + long volume = Query.getTableVolume(context.getConf(), finalOutputDir); + stats.setNumBytes(volume); + stats.setNumRows(1); - CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder(); - builder.setTableName(tableDesc.getName()); - builder.setStats(stats.getProto()); + CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder(); + builder.setTableName(tableDesc.getName()); + builder.setStats(stats.getProto()); - catalog.updateTableStats(builder.build()); + catalog.updateTableStats(builder.build()); - responseBuilder.setTableDesc(tableDesc.getProto()); - } else { - TableStats stats = new TableStats(); - long volume = Query.getTableVolume(context.getConf(), finalOutputDir); - stats.setNumBytes(volume); - stats.setNumRows(1); - - // Empty TableDesc - List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>(); - CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder() - .setTableName(nodeUniqName) - .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build()) - .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build()) - .setStats(stats.getProto()) - .build(); - - responseBuilder.setTableDesc(tableDescProto); - } + responseBuilder.setTableDesc(tableDesc.getProto()); + } else { + TableStats stats = new TableStats(); + long volume = Query.getTableVolume(context.getConf(), finalOutputDir); + stats.setNumBytes(volume); + stats.setNumRows(1); + + // Empty TableDesc + List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>(); + CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder() + .setTableName(nodeUniqName) + .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build()) + .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build()) + .setStats(stats.getProto()) + .build(); + + responseBuilder.setTableDesc(tableDescProto); + } - // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows. - responseBuilder.setMaxRowNum(-1); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows. + responseBuilder.setMaxRowNum(-1); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + } catch (Throwable t) { + throw new RuntimeException(t); + } } public void executeDistributedQuery(QueryContext queryContext, Session session, @@ -436,14 +442,17 @@ public class QueryExecutor { SubmitQueryResponse.Builder responseBuilder) throws Exception { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - String storeType = PlannerUtil.getStoreType(plan); - if (storeType != null) { - Tablespace sm = TableSpaceManager.getStorageManager(context.getConf(), storeType); - StorageProperty storageProperty = sm.getStorageProperty(); - if (!storageProperty.isSupportsInsertInto()) { + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot()); + if (tableDesc != null) { + + Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + StorageProperty storageProperty = space.getProperty(); + + if (!storageProperty.isInsertable()) { throw new VerifyException("Inserting into non-file storage is not supported."); } - sm.beforeInsertOrCATS(rootNode.getChild()); + + space.prepareTable(rootNode.getChild()); } context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); @@ -471,28 +480,15 @@ public class QueryExecutor { } } - public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner) + public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner) throws Exception { - String storeType = PlannerUtil.getStoreType(plan); - if (storeType != null) { - Tablespace sm = TableSpaceManager.getStorageManager(planner.getConf(), storeType); - StorageProperty storageProperty = sm.getStorageProperty(); - if (storageProperty.isSortedInsert()) { - String tableName = PlannerUtil.getStoreTableName(plan); - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild()); - if (tableDesc == null) { - throw new VerifyException("Can't get table meta data from catalog: " + tableName); - } - List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( - context, tableDesc); - if (storageSpecifiedRewriteRules != null) { - for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) { - eachRule.rewrite(context, plan); - } - } - } + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild()); + + if (tableDesc != null) { + Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + space.rewritePlan(context, plan); } MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
