Repository: tajo Updated Branches: refs/heads/master bf68b770e -> b4adc18cd
TAJO-1211: Staging directory for CTAS and INSERT should be in the output dir. Closes #274 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b4adc18c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b4adc18c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b4adc18c Branch: refs/heads/master Commit: b4adc18cd25de550fe04a43ef69d715c146976db Parents: bf68b77 Author: Hyunsik Choi <[email protected]> Authored: Mon Dec 1 17:23:35 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Dec 1 17:23:35 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 12 +++- .../org/apache/tajo/master/GlobalEngine.java | 7 +- .../java/org/apache/tajo/master/TajoMaster.java | 2 +- .../apache/tajo/master/querymaster/Query.java | 74 +++++++++++++++----- .../master/querymaster/QueryMasterTask.java | 23 ++++-- .../src/main/resources/webapps/admin/index.jsp | 2 +- .../apache/tajo/engine/query/TestCTASQuery.java | 1 + .../tajo/engine/query/TestInsertQuery.java | 14 +++- 9 files changed, 105 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 33983bc..8d51d44 100644 --- a/CHANGES +++ b/CHANGES @@ -79,6 +79,9 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1211: Staging directory for CTAS and INSERT should be in + the output dir. (hyunsik) + TAJO-1210: ByteBufLineReader does not handle the end of file, if newline is not appeared. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index f3ae453..312abfb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -671,7 +671,15 @@ public class TajoConf extends Configuration { return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0; } - public static Path getStagingDir(TajoConf conf) throws IOException { + /** + * It returns the default root staging directory used by queries without a target table or + * a specified output directory. An example query is <pre>SELECT a,b,c FROM XXX;</pre>. + * + * @param conf TajoConf + * @return Path which points the default staging directory + * @throws IOException + */ + public static Path getDefaultRootStagingDir(TajoConf conf) throws IOException { String stagingDirString = conf.getVar(ConfVars.STAGING_ROOT_DIR); if (!hasScheme(stagingDirString)) { Path warehousePath = getWarehouseDir(conf); @@ -686,7 +694,7 @@ public class TajoConf extends Configuration { public static Path getQueryHistoryDir(TajoConf conf) throws IOException { String historyDirString = conf.getVar(ConfVars.HISTORY_QUERY_DIR); if (!hasScheme(historyDirString)) { - Path stagingPath = getStagingDir(conf); + Path stagingPath = getDefaultRootStagingDir(conf); FileSystem fs = stagingPath.getFileSystem(conf); Path path = new Path(fs.getUri().toString(), historyDirString); conf.setVar(ConfVars.HISTORY_QUERY_DIR, path.toString()); http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 47a8750..9bf9a75 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -335,10 +335,8 @@ public class GlobalEngine extends AbstractService { String queryId = nodeUniqName + "_" + System.currentTimeMillis(); FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); - Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString()); - + Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - fs.mkdirs(stagingResultDir); TableDesc tableDesc = null; Path finalOutputDir = null; @@ -349,8 +347,7 @@ public class GlobalEngine extends AbstractService { finalOutputDir = insertNode.getPath(); } - TaskAttemptContext taskAttemptContext = - new TaskAttemptContext(queryContext, null, null, (CatalogProtos.FragmentProto[]) null, stagingDir); + TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir); taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 6e585af..795983d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -271,7 +271,7 @@ public class TajoMaster extends CompositeService { LOG.info("Warehouse dir '" + wareHousePath + "' is created"); } - Path stagingPath = TajoConf.getStagingDir(systemConf); + Path stagingPath = TajoConf.getDefaultRootStagingDir(systemConf); LOG.info("Staging dir: " + wareHousePath); if (!defaultFS.exists(stagingPath)) { defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION)); http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 7db6d8b..07b47c1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -411,9 +411,9 @@ public class Query implements EventHandler<QueryEvent> { public QueryState transition(Query query, QueryEvent queryEvent) { QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent; QueryState finalState; + if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) { - finalizeQuery(query, subQueryEvent); - finalState = QueryState.QUERY_SUCCEEDED; + finalState = finalizeQuery(query, subQueryEvent); } else if (subQueryEvent.getState() == SubQueryState.FAILED) { finalState = QueryState.QUERY_FAILED; } else if (subQueryEvent.getState() == SubQueryState.KILLED) { @@ -427,26 +427,28 @@ public class Query implements EventHandler<QueryEvent> { return finalState; } - private void finalizeQuery(Query query, QueryCompletedEvent event) { + private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { MasterPlan masterPlan = query.getPlan(); ExecutionBlock terminal = query.getPlan().getTerminalBlock(); DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId()); - Path finalOutputDir = commitOutputData(query); QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); try { - hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), - finalOutputDir); - } catch (Exception e) { - query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); + Path finalOutputDir = commitOutputData(query); + hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); + } catch (Throwable t) { + query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t))); + return QueryState.QUERY_ERROR; } + + return QueryState.QUERY_SUCCEEDED; } /** * It moves a result data stored in a staging output dir into a final output dir. */ - public Path commitOutputData(Query query) { + public Path commitOutputData(Query query) throws IOException { QueryContext queryContext = query.context.getQueryContext(); Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); Path finalOutputDir; @@ -508,24 +510,49 @@ public class Query implements EventHandler<QueryEvent> { fs.delete(entry.getValue(), true); fs.rename(entry.getValue(), entry.getKey()); } + throw new IOException(ioe.getMessage()); } - } else { + } else { // no partition try { + + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); + fs.mkdirs(oldTableDir); + + for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } + movedToOldTable = fs.exists(oldTableDir); } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); + fs.mkdirs(finalOutputDir); } - fs.rename(stagingResultDir, finalOutputDir); + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + + // Check the final output dir committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { // recover the old table if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } + + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } } + + throw new IOException(ioe.getMessage()); } } } else { @@ -560,13 +587,24 @@ public class Query implements EventHandler<QueryEvent> { } } } else { // CREATE TABLE AS SELECT (CTAS) - fs.rename(stagingResultDir, finalOutputDir); + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } else { + fs.rename(stagingResultDir, finalOutputDir); + } LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); } } - } catch (IOException e) { - // TODO report to client - e.printStackTrace(); + + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = queryContext.getStagingDir().getParent(); + fs.delete(stagingDirRoot, true); + + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); } } else { finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 5cf3df5..75d8ab6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -58,6 +58,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; @@ -79,6 +80,8 @@ public class QueryMasterTask extends CompositeService { final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx-------- + public static final String TMP_STAGING_DIR_PREFIX = ".staging"; + private QueryId queryId; private Session session; @@ -399,8 +402,7 @@ public class QueryMasterTask extends CompositeService { try { - stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString()); - defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME)); + stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext); // Create a subdirectories LOG.info("The staging dir '" + stagingDir + "' is created."); @@ -423,7 +425,7 @@ public class QueryMasterTask extends CompositeService { * It initializes the final output and staging directory and sets * them to variables. */ - public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws IOException { + public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException { String realUser; String currentUser; @@ -432,13 +434,21 @@ public class QueryMasterTask extends CompositeService { realUser = ugi.getShortUserName(); currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); - Path stagingDir = null; + FileSystem fs; + Path stagingDir; //////////////////////////////////////////// // Create Output Directory //////////////////////////////////////////// - stagingDir = new Path(TajoConf.getStagingDir(conf), queryId); + if (context.isCreateTable() || context.isInsert()) { + stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); + } else { + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } + + // initializ + fs = stagingDir.getFileSystem(conf); if (fs.exists(stagingDir)) { throw new IOException("The staging directory '" + stagingDir + "' already exists"); @@ -462,6 +472,9 @@ public class QueryMasterTask extends CompositeService { fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); } + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.mkdirs(stagingResultDir); + return stagingDir; } http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 30cbf88..6778725 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -131,7 +131,7 @@ <tr><td width='150'>Root dir:</td><td><%=TajoConf.getTajoRootDir(master.getContext().getConf())%></td></tr> <tr><td width='150'>System dir:</td><td><%=TajoConf.getSystemDir(master.getContext().getConf())%></td></tr> <tr><td width='150'>Warehouse dir:</td><td><%=TajoConf.getWarehouseDir(master.getContext().getConf())%></td></tr> - <tr><td width='150'>Staging dir:</td><td><%=TajoConf.getStagingDir(master.getContext().getConf())%></td></tr> + <tr><td width='150'>Staging dir:</td><td><%=TajoConf.getDefaultRootStagingDir(master.getContext().getConf())%></td></tr> <tr><td width='150'>Client Service:</td><td><%=NetUtils.normalizeInetSocketAddress(master.getTajoMasterClientService().getBindAddress())%></td></tr> <tr><td width='150'>Catalog Service:</td><td><%=master.getCatalogServer().getCatalogServerName()%></td></tr> <tr><td width='150'>Heap(Free/Total/Max): </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB / <%=Runtime.getRuntime().totalMemory()/1024/1024%> MB / <%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td> http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java index 0e02079..0e89803 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java @@ -28,6 +28,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.partition.PartitionMethodDesc; http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java index 9c97a55..117f186 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java @@ -29,6 +29,7 @@ import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.util.CommonTestingUtil; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -114,8 +115,19 @@ public class TestInsertQuery extends QueryTestCaseBase { @Test public final void testInsertIntoLocation() throws Exception { + Path dfsPath = new Path("/tajo-data/testInsertIntoLocation"); + assertTestInsertIntoLocation(dfsPath); + } + + @Test + public final void testInsertIntoLocationDifferentFSs() throws Exception { + Path localPath = CommonTestingUtil.getTestDir(); + assertTestInsertIntoLocation(localPath); + } + + public final void assertTestInsertIntoLocation(Path path) throws Exception { FileSystem fs = null; - Path path = new Path("/tajo-data/testInsertIntoLocation"); + try { executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
