TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/90afaa46 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/90afaa46 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/90afaa46 Branch: refs/heads/master Commit: 90afaa468080d4f743ed2eee8326a38995900807 Parents: d440727 Author: Hyunsik Choi <[email protected]> Authored: Wed Jun 24 17:55:10 2015 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Wed Jun 24 17:55:10 2015 -0700 ---------------------------------------------------------------------- .travis.yml | 4 +- .../apache/tajo/storage/StorageConstants.java | 3 + .../java/org/apache/tajo/util/KeyValueSet.java | 8 +- .../engine/planner/PhysicalPlannerImpl.java | 6 +- .../planner/physical/ColPartitionStoreExec.java | 2 +- .../engine/planner/physical/InsertRowsExec.java | 107 +++++ .../physical/RangeShuffleFileWriteExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 2 +- .../engine/planner/physical/StoreTableExec.java | 10 +- .../apache/tajo/engine/query/QueryContext.java | 4 +- .../org/apache/tajo/master/GlobalEngine.java | 10 +- .../apache/tajo/master/exec/DDLExecutor.java | 10 +- .../exec/NonForwardQueryResultFileScanner.java | 2 +- .../apache/tajo/master/exec/QueryExecutor.java | 179 +++++---- .../master/exec/prehook/CreateTableHook.java | 2 - .../java/org/apache/tajo/querymaster/Query.java | 6 +- .../tajo/querymaster/QueryMasterTask.java | 118 +----- .../apache/tajo/querymaster/Repartitioner.java | 10 +- .../java/org/apache/tajo/querymaster/Stage.java | 4 +- .../org/apache/tajo/worker/LegacyTaskImpl.java | 2 +- .../java/org/apache/tajo/worker/TaskImpl.java | 2 +- .../src/main/resources/webapps/admin/index.jsp | 4 +- .../org/apache/tajo/BackendTestingUtil.java | 2 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 35 ++ .../org/apache/tajo/TajoTestingCluster.java | 18 +- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +- .../apache/tajo/engine/eval/ExprTestBase.java | 5 +- .../tajo/engine/eval/TestEvalTreeUtil.java | 5 +- .../engine/planner/TestLogicalOptimizer.java | 4 +- .../tajo/engine/planner/TestLogicalPlan.java | 4 +- .../tajo/engine/planner/TestLogicalPlanner.java | 4 +- .../tajo/engine/planner/TestPlannerUtil.java | 4 +- .../planner/physical/TestBNLJoinExec.java | 6 +- .../planner/physical/TestBSTIndexExec.java | 4 +- .../planner/physical/TestExternalSortExec.java | 4 +- .../physical/TestFullOuterHashJoinExec.java | 10 +- .../physical/TestFullOuterMergeJoinExec.java | 12 +- .../planner/physical/TestHashAntiJoinExec.java | 6 +- .../planner/physical/TestHashJoinExec.java | 6 +- .../planner/physical/TestHashSemiJoinExec.java | 6 +- .../physical/TestLeftOuterHashJoinExec.java | 10 +- .../planner/physical/TestMergeJoinExec.java | 6 +- .../engine/planner/physical/TestNLJoinExec.java | 6 +- .../planner/physical/TestPhysicalPlanner.java | 12 +- .../physical/TestProgressExternalSortExec.java | 4 +- .../physical/TestRightOuterHashJoinExec.java | 8 +- .../physical/TestRightOuterMergeJoinExec.java | 12 +- .../engine/planner/physical/TestSortExec.java | 6 +- .../tajo/engine/query/TestHBaseTable.java | 62 ++- .../apache/tajo/engine/query/TestJoinQuery.java | 2 +- .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 88 +++-- .../org/apache/tajo/jdbc/TestResultSet.java | 2 +- .../tajo/master/TestExecutionBlockCursor.java | 4 +- .../TestNonForwardQueryResultSystemScanner.java | 258 +----------- .../apache/tajo/querymaster/TestKillQuery.java | 6 +- .../org/apache/tajo/storage/TestRowFile.java | 2 +- .../TestHBaseTable/testInsertValues1.result | 4 + .../testGetClusterDetails.result | 4 + .../testGetNextRowsForAggregateFunction.result | 3 + .../testGetNextRowsForTable.result | 5 + .../java/org/apache/tajo/plan/LogicalPlan.java | 8 + .../org/apache/tajo/plan/util/PlannerUtil.java | 16 + .../org/apache/tajo/storage/FormatProperty.java | 42 +- .../org/apache/tajo/storage/MergeScanner.java | 4 +- .../apache/tajo/storage/OldStorageManager.java | 3 +- .../apache/tajo/storage/StorageProperty.java | 42 +- .../apache/tajo/storage/TableSpaceManager.java | 390 ------------------- .../org/apache/tajo/storage/Tablespace.java | 19 +- .../apache/tajo/storage/TablespaceManager.java | 390 +++++++++++++++++++ .../tajo/storage/hbase/HBasePutAppender.java | 4 +- .../apache/tajo/storage/hbase/HBaseScanner.java | 4 +- .../tajo/storage/hbase/HBaseTablespace.java | 48 ++- .../tajo/storage/hbase/TestHBaseTableSpace.java | 10 +- .../org/apache/tajo/storage/FileAppender.java | 2 +- .../org/apache/tajo/storage/FileTablespace.java | 138 +++++-- .../storage/HashShuffleAppenderManager.java | 2 +- .../tajo/storage/TestCompressionStorages.java | 4 +- .../tajo/storage/TestDelimitedTextFile.java | 8 +- .../apache/tajo/storage/TestFileSystems.java | 2 +- .../apache/tajo/storage/TestFileTablespace.java | 12 +- .../org/apache/tajo/storage/TestLineReader.java | 8 +- .../apache/tajo/storage/TestMergeScanner.java | 6 +- .../org/apache/tajo/storage/TestStorages.java | 44 +-- .../apache/tajo/storage/index/TestBSTIndex.java | 20 +- .../index/TestSingleCSVFileBSTIndex.java | 4 +- .../apache/tajo/storage/json/TestJsonSerDe.java | 2 +- 86 files changed, 1236 insertions(+), 1136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 8fc9b94..61d56fb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,7 @@ git: depth: 150 jdk: - - openjdk7 + - oraclejdk7 env: PATH=$PATH:$HOME/local/bin @@ -33,7 +33,7 @@ notifications: - [email protected] irc: "chat.freenode.net#tajo" - +before_install: ulimit -t 514029 install: ./dev-support/travis-install-dependencies.sh script: http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index a9923a5..16cf51d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -30,6 +30,9 @@ public class StorageConstants { // Common table properties ------------------------------------------------- + // Insert + public static final String INSERT_DIRECTLY = "insert.direct"; + // time zone public static final String TIMEZONE = "timezone"; http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java index 0e27769..404606d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java @@ -115,6 +115,10 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs return get(key, null); } + public boolean isTrue(String key) { + return getBool(key, false); + } + public void setBool(String key, boolean val) { set(key, val ? TRUE_STR : FALSE_STR); } @@ -137,9 +141,9 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs public boolean getBool(ConfigKey key) { String keyName = key.keyname(); if (key instanceof SessionVars) { - return getBool(keyName, ((SessionVars)key).getConfVars().defaultBoolVal); + return getBool(keyName, ((SessionVars) key).getConfVars().defaultBoolVal); } else if (key instanceof TajoConf.ConfVars) { - return getBool(keyName, ((TajoConf.ConfVars)key).defaultBoolVal); + return getBool(keyName, ((TajoConf.ConfVars) key).defaultBoolVal); } return getBool(keyName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index f0b2f5e..c6b9b41 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -252,7 +252,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { FragmentProto[] fragmentProtos = ctx.getTables(tableId); List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos); for (Fragment frag : fragments) { - size += TableSpaceManager.guessFragmentVolume(ctx.getConf(), frag); + size += TablespaceManager.guessFragmentVolume(ctx.getConf(), frag); } } return size; @@ -926,7 +926,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; List<Fragment> fileFragments = TUtil.newList(); - FileTablespace space = (FileTablespace) TableSpaceManager.get(scanNode.getTableDesc().getUri()).get(); + FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri()).get(); for (Path path : partitionedTableScanNode.getInputPaths()) { fileFragments.addAll(TUtil.newList(space.split(scanNode.getCanonicalName(), path))); } @@ -1190,7 +1190,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { FragmentConvertor.convert(ctx.getConf(), fragmentProtos); String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys()); - FileTablespace sm = (FileTablespace) TableSpaceManager.get(fragments.get(0).getPath().toUri()).get(); + FileTablespace sm = (FileTablespace) TablespaceManager.get(fragments.get(0).getPath().toUri()).get(); String dbName = CatalogUtil.extractQualifier(annotation.getTableName()); String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName()); Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index"); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 969998c..76abc6d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -165,7 +165,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { actualFilePath = new Path(lastFileName + "_" + suffixId); } - appender = ((FileTablespace) TableSpaceManager.get(lastFileName.toUri()).get()) + appender = ((FileTablespace) TablespaceManager.get(lastFileName.toUri()).get()) .getAppender(meta, outSchema, actualFilePath); appender.enableStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java new file mode 100644 index 0000000..f3a24a7 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.StatisticsUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.plan.logical.PersistentStoreNode; +import org.apache.tajo.plan.logical.StoreTableNode; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; + +/** + * This is a physical executor to store rows immediately. + */ +public class InsertRowsExec extends UnaryPhysicalExec { + private static final Log LOG = LogFactory.getLog(InsertRowsExec.class); + + private PersistentStoreNode plan; + private TableMeta meta; + private Appender appender; + private Tuple tuple; + + // for file punctuation + private TableStats sumStats; // for aggregating all stats of written files + + public InsertRowsExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), child); + this.plan = plan; + } + + public void init() throws IOException { + super.init(); + + if (plan.hasOptions()) { + meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions()); + } else { + meta = CatalogUtil.newTableMeta(plan.getStorageType()); + } + + PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta); + sumStats = new TableStats(); + + StoreTableNode storeTableNode = (StoreTableNode) plan; + appender = TablespaceManager.get(storeTableNode.getUri()).get().getAppenderForInsertRow( + context.getQueryContext(), + context.getTaskId(), meta, storeTableNode.getTableSchema(), context.getOutputPath()); + appender.enableStats(); + appender.init(); + } + + /* (non-Javadoc) + * @see PhysicalExec#next() + */ + @Override + public Tuple next() throws IOException { + while((tuple = child.next()) != null) { + appender.addTuple(tuple); + } + + return null; + } + + @Override + public void rescan() throws IOException { + // nothing to do + } + + public void close() throws IOException { + super.close(); + + if(appender != null){ + appender.flush(); + appender.close(); + + // Collect statistics data + StatisticsUtil.aggregateTableStat(sumStats, appender.getStats()); + context.setResultStats(sumStats); + } + + appender = null; + plan = null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index fb29e4f..bbb21fe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -77,7 +77,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { context.getDataChannel().getStoreType() : "RAW"); FileSystem fs = new RawLocalFileSystem(); fs.mkdirs(storeTablePath); - this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getDefault()) + this.appender = (FileAppender) ((FileTablespace) TablespaceManager.getDefault()) .getAppender(meta, outSchema, new Path(storeTablePath, "output")); this.appender.enableStats(); this.appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index d2ae3bd..599f160 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -202,7 +202,7 @@ public class SeqScanExec extends ScanExec { FragmentConvertor.convert(context.getConf(), fragments), projected ); } else { - Tablespace tablespace = TableSpaceManager.get(plan.getTableDesc().getUri()).get(); + Tablespace tablespace = TablespaceManager.get(plan.getTableDesc().getUri()).get(); this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected); } scanner.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index dd8768e..6031fdb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -31,12 +31,14 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.logical.InsertNode; import org.apache.tajo.plan.logical.PersistentStoreNode; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.*; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileTablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.Tuple; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.net.URI; /** * This is a physical executor to store a table part into a specified storage. @@ -92,7 +94,7 @@ public class StoreTableExec extends UnaryPhysicalExec { lastFileName = new Path(lastFileName + "_" + suffixId); } - Optional<FileTablespace> spaceRes = TableSpaceManager.get(lastFileName.toUri()); + Optional<FileTablespace> spaceRes = TablespaceManager.get(lastFileName.toUri()); if (!spaceRes.isPresent()) { throw new IllegalStateException("No Tablespace for " + lastFileName.toUri()); } @@ -106,7 +108,7 @@ public class StoreTableExec extends UnaryPhysicalExec { } } else { Path stagingDir = context.getQueryContext().getStagingDir(); - appender = TableSpaceManager.get(stagingDir.toUri()).get().getAppender( + appender = TablespaceManager.get(stagingDir.toUri()).get().getAppender( context.getQueryContext(), context.getTaskId(), meta, http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index 7696c6c..da2f2ad 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -99,8 +99,8 @@ public class QueryContext extends OverridableConf { return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null; } - public void setStagingDir(Path path) { - put(QueryVars.STAGING_DIR, path.toUri().toString()); + public void setStagingDir(URI uri) { + put(QueryVars.STAGING_DIR, uri.toString()); } public Path getStagingDir() { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index e833884..37b497c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -53,7 +53,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import java.io.IOException; @@ -91,7 +91,7 @@ public class GlobalEngine extends AbstractService { try { analyzer = new SQLAnalyzer(); preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); - planner = new LogicalPlanner(context.getCatalog(), TableSpaceManager.getInstance()); + planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(context.getConf()); annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog()); } catch (Throwable t) { @@ -141,8 +141,8 @@ public class GlobalEngine extends AbstractService { QueryContext newQueryContext = new QueryContext(context.getConf(), session); // Set default space uri and its root uri - newQueryContext.setDefaultSpaceUri(TableSpaceManager.getDefault().getUri()); - newQueryContext.setDefaultSpaceRootUri(TableSpaceManager.getDefault().getRootUri()); + newQueryContext.setDefaultSpaceUri(TablespaceManager.getDefault().getUri()); + newQueryContext.setDefaultSpaceRootUri(TablespaceManager.getDefault().getRootUri()); String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY); if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) { @@ -303,7 +303,7 @@ public class GlobalEngine extends AbstractService { InsertNode iNode = rootNode.getChild(); Schema outSchema = iNode.getChild().getOutSchema(); - TableSpaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema); + TablespaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema); } catch (Throwable t) { state.addVerification(t.getMessage()); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index 5e0e639..7104412 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -36,7 +36,7 @@ import org.apache.tajo.master.TajoMaster; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.StorageUtil; @@ -251,11 +251,11 @@ public class DDLExecutor { Tablespace tableSpace; if (tableSpaceName != null) { - tableSpace = TableSpaceManager.getByName(tableSpaceName).get(); + tableSpace = TablespaceManager.getByName(tableSpaceName).get(); } else if (uri != null) { - tableSpace = TableSpaceManager.get(uri).get(); + tableSpace = TablespaceManager.get(uri).get(); } else { - tableSpace = TableSpaceManager.getDefault(); + tableSpace = TablespaceManager.getDefault(); } TableDesc desc; @@ -313,7 +313,7 @@ public class DDLExecutor { if (purge) { try { - TableSpaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc); + TablespaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc); } catch (IOException e) { throw new InternalError(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index ae57453..ec8760f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -101,7 +101,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc } private void initSeqScanExec() throws IOException { - Tablespace tablespace = TableSpaceManager.get(tableDesc.getUri()).get(); + Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()).get(); List<Fragment> fragments = null; setPartition(tablespace); fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 480f45c..5d42157 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -31,6 +31,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; @@ -39,7 +40,7 @@ import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.physical.EvalExprExec; -import org.apache.tajo.engine.planner.physical.StoreTableExec; +import org.apache.tajo.engine.planner.physical.InsertRowsExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; @@ -60,14 +61,13 @@ import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.querymaster.Query; -import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.session.Session; import org.apache.tajo.storage.*; import org.apache.tajo.util.ProtoUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -288,7 +288,7 @@ public class QueryExecutor { boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; if (isInsert) { InsertNode insertNode = rootNode.getChild(); - insertNonFromQuery(queryContext, insertNode, responseBuilder); + insertRowValues(queryContext, insertNode, responseBuilder); } else { Schema schema = PlannerUtil.targetToSchema(targets); RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); @@ -330,89 +330,123 @@ public class QueryExecutor { } } - private void insertNonFromQuery(QueryContext queryContext, - InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) { + /** + * Insert rows through staging phase + */ + private void insertRowsThroughStaging(TaskAttemptContext taskAttemptContext, + InsertNode insertNode, + Path finalOutputPath, + Path stagingDir, + Path stagingResultDir) + throws IOException { + + EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); + InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec); + + try { + exec.init(); + exec.next(); + } finally { + exec.close(); + } + + FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); + + if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO + // it moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + try { + if (fs.exists(finalOutputPath)) { + fs.rename(finalOutputPath, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputPath.getParent()); + } + fs.rename(stagingResultDir, finalOutputPath); + committed = fs.exists(finalOutputPath); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputPath); + } + } + } else { + FileStatus[] files = fs.listStatus(stagingResultDir); + for (FileStatus eachFile : files) { + Path targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName()); + if (fs.exists(targetFilePath)) { + targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName() + "_" + System.currentTimeMillis()); + } + fs.rename(eachFile.getPath(), targetFilePath); + } + } + } + + /** + * Insert row values + */ + private void insertRowValues(QueryContext queryContext, + InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) { try { String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() : insertNode.getTableName(); String queryId = nodeUniqName + "_" + System.currentTimeMillis(); - FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); - Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + URI finalOutputUri = insertNode.getUri(); + Tablespace space = TablespaceManager.get(finalOutputUri).get(); + TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions()); + tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString()); - TableDesc tableDesc = null; - Path finalOutputDir; - if (insertNode.getTableName() != null) { - tableDesc = this.catalog.getTableDesc(insertNode.getTableName()); - finalOutputDir = new Path(tableDesc.getUri()); - } else { - finalOutputDir = new Path(insertNode.getUri()); - } + FormatProperty formatProperty = space.getFormatProperty(tableMeta); - TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir); - taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); + TaskAttemptContext taskAttemptContext; + if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion + taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null); + taskAttemptContext.setOutputPath(new Path(finalOutputUri)); - EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); - StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec); - try { - exec.init(); - exec.next(); - } finally { - exec.close(); - } + EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); + InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec); - if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO - // it moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); - } - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); - } + exec.init(); + exec.next(); + } finally { + exec.close(); } } else { - FileStatus[] files = fs.listStatus(stagingResultDir); - for (FileStatus eachFile : files) { - Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName()); - if (fs.exists(targetFilePath)) { - targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis()); - } - fs.rename(eachFile.getPath(), targetFilePath); - } + URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta); + Path stagingDir = new Path(stagingSpaceUri); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + + taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir); + taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); + insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir); } - if (insertNode.hasTargetTable()) { - TableStats stats = tableDesc.getStats(); - long volume = Query.getTableVolume(context.getConf(), finalOutputDir); - stats.setNumBytes(volume); - stats.setNumRows(1); + // set insert stats (how many rows and bytes) + TableStats stats = new TableStats(); + stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes()); + stats.setNumRows(taskAttemptContext.getResultStats().getNumRows()); + if (insertNode.hasTargetTable()) { CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder(); - builder.setTableName(tableDesc.getName()); + builder.setTableName(insertNode.getTableName()); builder.setStats(stats.getProto()); catalog.updateTableStats(builder.build()); - responseBuilder.setTableDesc(tableDesc.getProto()); - } else { - TableStats stats = new TableStats(); - long volume = Query.getTableVolume(context.getConf(), finalOutputDir); - stats.setNumBytes(volume); - stats.setNumRows(1); + TableDesc desc = new TableDesc( + insertNode.getTableName(), + insertNode.getTargetSchema(), + tableMeta, + finalOutputUri); + responseBuilder.setTableDesc(desc.getProto()); + + } else { // If INSERT INTO LOCATION // Empty TableDesc List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>(); @@ -445,11 +479,12 @@ public class QueryExecutor { TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot()); if (tableDesc != null) { - Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); - StorageProperty storageProperty = space.getProperty(); + Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); + FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta()); - if (!storageProperty.isInsertable()) { - throw new VerifyException("Inserting into non-file storage is not supported."); + if (!formatProperty.isInsertable()) { + throw new VerifyException( + String.format("%s tablespace does not allow INSERT operation.", tableDesc.getUri().toString())); } space.prepareTable(rootNode.getChild()); @@ -487,7 +522,7 @@ public class QueryExecutor { TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild()); if (tableDesc != null) { - Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); space.rewritePlan(context, plan); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java index d490001..0c02b6e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java @@ -24,8 +24,6 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.storage.TableSpaceManager; -import org.apache.tajo.storage.Tablespace; public class CreateTableHook implements DistributedQueryHook { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 4fef02c..9d5838d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -48,7 +48,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; @@ -447,7 +447,7 @@ public class Query implements EventHandler<QueryEvent> { QueryContext context = query.context.getQueryContext(); if (lastStage != null && context.hasOutputTableUri()) { - Tablespace space = TableSpaceManager.get(context.getOutputTableUri()).get(); + Tablespace space = TablespaceManager.get(context.getOutputTableUri()).get(); try { LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); space.rollbackTable(rootNode.getChild()); @@ -470,7 +470,7 @@ public class Query implements EventHandler<QueryEvent> { // If there is not tabledesc, it is a select query without insert or ctas. // In this case, we should use default tablespace. - Tablespace space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); + Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); Path finalOutputDir = space.commitTable( query.context.getQueryContext(), http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 84f2eac..1f5e7a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -18,15 +18,10 @@ package org.apache.tajo.querymaster; -import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -61,6 +56,7 @@ import org.apache.tajo.worker.AbstractResourceAllocator; import org.apache.tajo.worker.TajoResourceAllocator; import java.io.IOException; +import java.net.URI; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -70,12 +66,6 @@ import static org.apache.tajo.TajoProtos.QueryState; public class QueryMasterTask extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName()); - // query submission directory is private! - final public static FsPermission STAGING_DIR_PERMISSION = - FsPermission.createImmutable((short) 0700); // rwx-------- - - public static final String TMP_STAGING_DIR_PREFIX = ".staging"; - private QueryId queryId; private Session session; @@ -157,8 +147,6 @@ public class QueryMasterTask extends CompositeService { dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher()); dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler()); - initStagingDir(); - queryMetrics = new TajoMetrics(queryId.toString()); super.init(systemConf); @@ -303,8 +291,9 @@ public class QueryMasterTask extends CompositeService { state == QueryState.QUERY_ERROR; } + private LogicalPlan plan; + public synchronized void startQuery() { - LogicalPlan plan = null; Tablespace space = null; try { if (query != null) { @@ -314,7 +303,7 @@ public class QueryMasterTask extends CompositeService { CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); - LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); jsonExpr = null; // remove the possible OOM @@ -322,10 +311,12 @@ public class QueryMasterTask extends CompositeService { plan = planner.createPlan(queryContext, expr); optimizer.optimize(queryContext, plan); - // when a given uri is null, TableSpaceManager.get will return the default tablespace. - space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); + // when a given uri is null, TablespaceManager.get will return the default tablespace. + space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); space.rewritePlan(queryContext, plan); + initStagingDir(); + for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN); if (scanNodes != null) { @@ -367,94 +358,25 @@ public class QueryMasterTask extends CompositeService { } private void initStagingDir() throws IOException { - Path stagingDir; + URI stagingDir; try { + Tablespace tablespace = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); + TableDesc desc = PlannerUtil.getOutputTableDesc(plan); - stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext); + FormatProperty formatProperty = tablespace.getFormatProperty(desc.getMeta()); + if (formatProperty.isStagingSupport()) { + stagingDir = tablespace.prepareStagingSpace(systemConf, queryId.toString(), queryContext, desc.getMeta()); - // Create a subdirectories - LOG.info("The staging dir '" + stagingDir + "' is created."); - queryContext.setStagingDir(stagingDir); - } catch (IOException ioe) { - LOG.warn("Creating staging dir has been failed.", ioe); - - throw ioe; - } - } - - /** - * It initializes the final output and staging directory and sets - * them to variables. - */ - public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException { - - String realUser; - String currentUser; - UserGroupInformation ugi; - ugi = UserGroupInformation.getLoginUser(); - realUser = ugi.getShortUserName(); - currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); - - FileSystem fs; - Path stagingDir; - - //////////////////////////////////////////// - // Create Output Directory - //////////////////////////////////////////// - - String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, ""); - - // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO - // So, this query results won't be materialized as a part of a table. - // The result will be temporarily written in the staging directory. - if (outputPath.isEmpty()) { - // for temporarily written in the storage directory - stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); - } else { - Optional<Tablespace> spaceResult = TableSpaceManager.get(outputPath); - if (!spaceResult.isPresent()) { - throw new IOException("No registered Tablespace for " + outputPath); + // Create a staging space + LOG.info("The staging dir '" + stagingDir + "' is created."); + queryContext.setStagingDir(stagingDir); } - Tablespace space = spaceResult.get(); - if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation - // If this space allows move operation, the staging directory will be underneath the final output table uri. - stagingDir = StorageUtil.concatPath(context.getOutputTableUri().toString(), TMP_STAGING_DIR_PREFIX, queryId); - } else { - stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); - } - } - - // initializ - fs = stagingDir.getFileSystem(conf); - - if (fs.exists(stagingDir)) { - throw new IOException("The staging directory '" + stagingDir + "' already exists"); - } - fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - FileStatus fsStatus = fs.getFileStatus(stagingDir); - String owner = fsStatus.getOwner(); - - if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { - throw new IOException("The ownership on the user's query " + - "directory " + stagingDir + " is not as expected. " + - "It is owned by " + owner + ". The directory must " + - "be owned by the submitter " + currentUser + " or " + - "by " + realUser); - } - - if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { - LOG.info("Permissions on staging directory " + stagingDir + " are " + - "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + - "to correct value " + STAGING_DIR_PERMISSION); - fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + } catch (IOException ioe) { + LOG.warn("Creating staging space has been failed.", ioe); + throw ioe; } - - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - fs.mkdirs(stagingResultDir); - - return stagingDir; } public Query getQuery() { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 5b8f24a..f30fb64 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -100,7 +100,7 @@ public class Repartitioner { stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes(); } - // TODO - We should remove dummy flagment usages + // TODO - We should remove dummy fragment usages fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0, new String[]{UNKNOWN_HOST}); @@ -115,7 +115,7 @@ public class Repartitioner { // if table has no data, tablespace will return empty FileFragment. // So, we need to handle FileFragment by its size. // If we don't check its size, it can cause IndexOutOfBoundsException. - Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc); if (fileFragments.size() > 0) { fragments[i] = fileFragments.get(0); @@ -380,7 +380,7 @@ public class Repartitioner { Path[] partitionScanPaths = null; TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); - Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { @@ -507,7 +507,7 @@ public class Repartitioner { Collection<Fragment> scanFragments; Path[] partitionScanPaths = null; - FileTablespace space = (FileTablespace) TableSpaceManager.get(desc.getUri()).get(); + FileTablespace space = (FileTablespace) TablespaceManager.get(desc.getUri()).get(); if (scan.getType() == NodeType.PARTITIONS_SCAN) { PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; @@ -645,7 +645,7 @@ public class Repartitioner { PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); } - Tablespace space = TableSpaceManager.getAnyByScheme(storeType).get(); + Tablespace space = TablespaceManager.getAnyByScheme(storeType).get(); ranges = space.getInsertSortRanges( stage.getContext().getQueryContext(), tableDesc, http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index a7d605c..1163a6e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -60,7 +60,7 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.storage.FileTablespace; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; @@ -1084,7 +1084,7 @@ public class Stage implements EventHandler<StageEvent> { Collection<Fragment> fragments; TableMeta meta = table.getMeta(); - Tablespace tablespace = TableSpaceManager.get(scan.getTableDesc().getUri()).get(); + Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()).get(); // Depending on scanner node's type, it creates fragments. If scan is for // a partitioned table, It will creates lots fragments for all partitions. http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java index 0df5d4d..f97ce29 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java @@ -160,7 +160,7 @@ public class LegacyTaskImpl implements Task { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get()) + Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get()) .getAppenderFilePath(getId(), queryContext.getStagingDir()); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 5974693..7697458 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -154,7 +154,7 @@ public class TaskImpl implements Task { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get()) + Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get()) .getAppenderFilePath(getId(), queryContext.getStagingDir()); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 43bb6c1..bd84283 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -27,7 +27,7 @@ <%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.service.TajoMasterInfo" %> -<%@ page import="org.apache.tajo.storage.TableSpaceManager" %> +<%@ page import="org.apache.tajo.storage.TablespaceManager" %> <%@ page import="org.apache.tajo.storage.Tablespace" %> <%@ page import="org.apache.tajo.util.NetUtils" %> <%@ page import="org.apache.tajo.util.TUtil" %> @@ -141,7 +141,7 @@ <h3>Tablespaces</h3> <table width="100%" class="border_table" border="1"> <tr><th>Tablespace Name</th><th>URI</th><th>Handler</th></tr> - <% for (Tablespace space : TableSpaceManager.getAllTablespaces()) { + <% for (Tablespace space : TablespaceManager.getAllTablespaces()) { if (space.isVisible()) { %> <tr><td><%=space.getName()%></td><td><%=space.getUri()%></td><td><%=space.getClass().getName()%></td></tr> <% }}%> http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java index ca2378b..5df1122 100644 --- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java @@ -46,7 +46,7 @@ public class BackendTestingUtil { public static void writeTmpTable(TajoConf conf, Path tablePath) throws IOException { - FileTablespace sm = TableSpaceManager.getDefault(); + FileTablespace sm = TablespaceManager.getDefault(); Appender appender; Path filePath = new Path(tablePath, "table.csv"); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 57b1e18..a323f25 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -348,6 +348,41 @@ public class QueryTestCaseBase { } /** + * It executes the query file and compare the result against the the result file. + * + * @throws Exception + */ + public void assertQuery() throws Exception { + ResultSet res = null; + try { + res = executeQuery(); + assertResultSet(res); + } finally { + if (res != null) { + res.close(); + } + } + } + + /** + * It executes a given query statement and verifies the result against the the result file. + * + * @param query A query statement + * @throws Exception + */ + public void assertQueryStr(String query) throws Exception { + ResultSet res = null; + try { + res = executeString(query); + assertResultSet(res); + } finally { + if (res != null) { + res.close(); + } + } + } + + /** * Execute a query contained in the file located in src/test/resources/results/<i>ClassName</i>/<i>MethodName</i>. * <i>ClassName</i> and <i>MethodName</i> will be replaced by actual executed class and methods. * http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index acdae85..973f1e8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -48,7 +48,7 @@ import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.storage.FileTablespace; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; @@ -59,6 +59,7 @@ import java.io.File; import java.io.IOException; import java.io.Writer; import java.net.InetSocketAddress; +import java.net.URI; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; @@ -346,18 +347,17 @@ public class TajoTestingCluster { LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI)); if (!local) { - c.setVar(ConfVars.ROOT_DIR, getMiniDFSCluster().getFileSystem().getUri() + "/tajo"); - } else { - c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo"); - } + String tajoRootDir = getMiniDFSCluster().getFileSystem().getUri().toString() + "/tajo"; + c.setVar(ConfVars.ROOT_DIR, tajoRootDir); - // Do not need for local file system - if (!local) { + URI defaultTsUri = TajoConf.getWarehouseDir(c).toUri(); FileTablespace defaultTableSpace = - new FileTablespace(TableSpaceManager.DEFAULT_TABLESPACE_NAME, TajoConf.getWarehouseDir(c).toUri()); + new FileTablespace(TablespaceManager.DEFAULT_TABLESPACE_NAME, defaultTsUri); defaultTableSpace.init(conf); + TablespaceManager.addTableSpaceForTest(defaultTableSpace); - TableSpaceManager.addTableSpaceForTest(defaultTableSpace); + } else { + c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo"); } setupCatalogForTesting(c, testBuildDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index ce951c6..26e25a4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -32,7 +32,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.FileUtil; import org.junit.After; import org.junit.Before; @@ -217,7 +217,7 @@ public class TestTajoCli { if (!cluster.isHiveCatalogStoreRunning()) { assertOutputResult(resultFileName, consoleResult, new String[]{"${table.path}"}, - new String[]{TableSpaceManager.getDefault().getTableUri("default", tableName).toString()}); + new String[]{TablespaceManager.getDefault().getTableUri("default", tableName).toString()}); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 328f883..07a09ad 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -24,7 +24,6 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.cli.tsql.InvalidStatementException; import org.apache.tajo.cli.tsql.ParsedResult; import org.apache.tajo.cli.tsql.SimpleParser; @@ -50,7 +49,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; import org.apache.tajo.storage.LazyTuple; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.BytesUtils; @@ -104,7 +103,7 @@ public class ExprTestBase { analyzer = new SQLAnalyzer(); preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat); - planner = new LogicalPlanner(cat, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(cat, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(util.getConfiguration()); annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java index 80f3459..5a8238c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java @@ -27,7 +27,6 @@ import org.apache.tajo.algebra.OpType; import org.apache.tajo.algebra.Selection; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; @@ -45,7 +44,7 @@ import org.apache.tajo.plan.function.GeneralFunction; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.nameresolver.NameResolvingMode; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.CommonTestingUtil; import org.junit.AfterClass; @@ -117,7 +116,7 @@ public class TestEvalTreeUtil { catalog.createFunction(funcMeta); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); String[] QUERIES = { "select name, score, age from people where score > 30", // 0 http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java index 9aa7ddf..afa3472 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java @@ -34,7 +34,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.junit.AfterClass; @@ -103,7 +103,7 @@ public class TestLogicalOptimizer { catalog.createFunction(funcDesc); sqlAnalyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(util.getConfiguration()); defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java index 3cee816..dc9e2b0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java @@ -19,7 +19,7 @@ package org.apache.tajo.engine.planner; import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.graph.SimpleDirectedGraph; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -40,7 +40,7 @@ public class TestLogicalPlan { public static void setup() throws Exception { util = new TajoTestingCluster(); util.startCatalogCluster(); - planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TableSpaceManager.getInstance()); + planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TablespaceManager.getInstance()); } public static void tearDown() { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 351a6af..0f37763 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -42,7 +42,7 @@ import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; @@ -131,7 +131,7 @@ public class TestLogicalPlanner { catalog.createFunction(funcDesc); sqlAnalyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index d62eed2..fb35220 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -39,7 +39,7 @@ import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; @@ -109,7 +109,7 @@ public class TestPlannerUtil { catalog.createFunction(funcDesc); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java index 2464fb1..ace3d0d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -86,7 +86,7 @@ public class TestBNLJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.init(); VTuple tuple = new VTuple(schema.size()); @@ -108,7 +108,7 @@ public class TestBNLJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < INNER_TUPLE_NUM; i += 2) { @@ -124,7 +124,7 @@ public class TestBNLJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); } @After http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index 96a1f36..b4a6063 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -90,7 +90,7 @@ public class TestBSTIndexExec { Path workDir = CommonTestingUtil.getTestDir(); catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString()); catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - sm = TableSpaceManager.getLocalFs(); + sm = TablespaceManager.getLocalFs(); idxPath = new Path(workDir, "test.idx"); @@ -148,7 +148,7 @@ public class TestBSTIndexExec { catalog.createTable(desc); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index d94d3f6..cf5220e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -82,7 +82,7 @@ public class TestExternalSortExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.enableStats(); appender.init(); @@ -104,7 +104,7 @@ public class TestExternalSortExec { employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri()); catalog.createTable(employee); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); } @After http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java index 21a101a..dc4dd04 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -104,7 +104,7 @@ public class TestFullOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV"); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); VTuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -133,7 +133,7 @@ public class TestFullOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta("CSV"); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); appender2.init(); VTuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -172,7 +172,7 @@ public class TestFullOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV"); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); VTuple tuple3 = new VTuple(emp3Schema.size()); @@ -224,7 +224,7 @@ public class TestFullOuterHashJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV"); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); @@ -234,7 +234,7 @@ public class TestFullOuterHashJoinExec { catalog.createTable(phone3); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java index 0e2ce42..8fd61d0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java @@ -109,7 +109,7 @@ public class TestFullOuterMergeJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV"); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); VTuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -147,7 +147,7 @@ public class TestFullOuterMergeJoinExec { TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV"); Path dep4Path = new Path(testDir, "dep4.csv"); - Appender appender4 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path); + Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path); appender4.init(); VTuple tuple4 = new VTuple(dep4Schema.size()); for (int i = 0; i < 11; i++) { @@ -178,7 +178,7 @@ public class TestFullOuterMergeJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta("CSV"); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); appender2.init(); VTuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -217,7 +217,7 @@ public class TestFullOuterMergeJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV"); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); VTuple tuple3 = new VTuple(emp3Schema.size()); @@ -269,7 +269,7 @@ public class TestFullOuterMergeJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV"); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); appender5.flush(); @@ -278,7 +278,7 @@ public class TestFullOuterMergeJoinExec { catalog.createTable(phone3); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java index d54df1c..1b64a8f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java @@ -83,7 +83,7 @@ public class TestHashAntiJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); VTuple tuple = new VTuple(employeeSchema.size()); @@ -109,7 +109,7 @@ public class TestHashAntiJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); @@ -128,7 +128,7 @@ public class TestHashAntiJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index a8826ee..b9ee06a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -85,7 +85,7 @@ public class TestHashJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); VTuple tuple = new VTuple(employeeSchema.size()); @@ -108,7 +108,7 @@ public class TestHashJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getLocalFs()) + appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); @@ -126,7 +126,7 @@ public class TestHashJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); }
