Repository: tajo Updated Branches: refs/heads/master ae384685f -> 0f3412a74
TAJO-989: Cleanup of child blocks after parent execution block is complete. (jinho) Closes #103 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0f3412a7 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0f3412a7 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0f3412a7 Branch: refs/heads/master Commit: 0f3412a74bb3c565df1259b19630bc17e1bc69e0 Parents: ae38468 Author: jinossy <[email protected]> Authored: Tue Aug 5 11:58:16 2014 +0900 Committer: jinossy <[email protected]> Committed: Tue Aug 5 11:58:16 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../tajo/master/querymaster/QueryMaster.java | 30 +++++++++- .../tajo/master/querymaster/SubQuery.java | 20 ++++++- .../tajo/worker/TajoWorkerManagerService.java | 13 +++++ .../main/java/org/apache/tajo/worker/Task.java | 6 +- .../java/org/apache/tajo/worker/TaskRunner.java | 21 ++++++- .../src/main/proto/TajoWorkerProtocol.proto | 5 ++ .../apache/tajo/worker/TestDeletionService.java | 61 ++++++++++++++++++++ 8 files changed, 151 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 61593a2..be71bf4 100644 --- a/CHANGES +++ b/CHANGES @@ -29,6 +29,9 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-989: Cleanup of child blocks after parent execution block is complete + (jinho) + TAJO-966: Range partition should support split of multiple characters. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index f173c24..25af82f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -18,6 +18,7 @@ package org.apache.tajo.master.querymaster; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; @@ -162,6 +164,30 @@ public class QueryMaster extends CompositeService implements EventHandler { } } + protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) { + LOG.info("cleanup executionBlocks : " + executionBlockIds); + NettyClientBase rpc = null; + List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker(); + TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); + builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); + TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); + for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + try { + if (worker.getPeerRpcPort() == 0) continue; + + rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()), + TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + + tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get()); + } catch (Exception e) { + LOG.error(e.getMessage()); + } finally { + connPool.releaseConnection(rpc); + } + } + } + private void cleanup(QueryId queryId) { LOG.info("cleanup query resources : " + queryId); NettyClientBase rpc = null; @@ -338,7 +364,9 @@ public class QueryMaster extends CompositeService implements EventHandler { queryMasterTask.stop(); //if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE") // && !workerContext.isYarnContainerMode()) { - cleanup(queryId); // TODO We will support yarn mode + if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { + cleanup(queryId); + } //} } catch (Exception e) { LOG.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index f2e9dd5..17efa21 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TajoIdProtos; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; @@ -1004,7 +1005,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!"); subQuery.eventHandler.handle( new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH, - subQuery.getId(), allocationEvent.getAllocatedContainer())); + subQuery.getId(), allocationEvent.getAllocatedContainer()) + ); subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START)); } catch (Throwable t) { @@ -1107,6 +1109,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private void cleanup() { stopScheduler(); releaseContainers(); + + if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { + List<ExecutionBlock> childs = getMasterPlan().getChilds(getId()); + List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList(); + for (ExecutionBlock executionBlock : childs){ + ebIds.add(executionBlock.getId().getProto()); + } + + try { + getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); + } catch (Throwable e) { + LOG.error(e); + } + } } private static class SubQueryCompleteTransition @@ -1114,7 +1130,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { @Override public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - // TODO - Commit subQuery & do cleanup + // TODO - Commit subQuery // TODO - records succeeded, failed, killed completed task // TODO - records metrics try { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 13ef15d..e77da70 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoIdProtos; @@ -151,4 +152,16 @@ public class TajoWorkerManagerService extends CompositeService workerContext.cleanup(new QueryId(request).toString()); done.run(TajoWorker.TRUE_PROTO); } + + @Override + public void cleanupExecutionBlocks(RpcController controller, TajoWorkerProtocol.ExecutionBlockListProto request, + RpcCallback<PrimitiveProtos.BoolProto> done) { + for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : request.getExecutionBlockIdList()) { + String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); + workerContext.cleanup(inputDir); + String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); + workerContext.cleanup(outputDir); + } + done.run(TajoWorker.TRUE_PROTO); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 230c63a..3a4536a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -794,12 +794,10 @@ public class Task { } } } + public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { Path workDir = - StorageUtil.concatPath( - quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(), - "in", - quid.getQueryUnitId().getExecutionBlockId().toString(), + StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()), String.valueOf(quid.getQueryUnitId().getId()), String.valueOf(quid.getId())); return workDir; http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 3fcee06..9676192 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -45,6 +45,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.TajoIdUtils; import java.net.InetSocketAddress; @@ -172,6 +173,24 @@ public class TaskRunner extends AbstractService { return executionBlockId + "," + containerId; } + public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){ + Path workDir = + StorageUtil.concatPath( + executionBlockId.getQueryId().toString(), + "output", + String.valueOf(executionBlockId.getId())); + return workDir; + } + + public static Path getBaseInputDir(ExecutionBlockId executionBlockId) { + Path workDir = + StorageUtil.concatPath( + executionBlockId.getQueryId().toString(), + "in", + executionBlockId.toString()); + return workDir; + } + @Override public void init(Configuration conf) { this.systemConf = (TajoConf)conf; @@ -182,7 +201,7 @@ public class TaskRunner extends AbstractService { localFS = FileSystem.getLocal(conf); // the base dir for an output dir - baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId(); + baseDir = getBaseOutputDir(executionBlockId).toString(); // initialize LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index ce8ce86..dc2b1d7 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -179,6 +179,10 @@ message RunExecutionBlockRequestProto { optional string queryOutputPath = 6; } +message ExecutionBlockListProto { + repeated ExecutionBlockIdProto executionBlockId = 1; +} + service TajoWorkerProtocolService { rpc ping (QueryUnitAttemptIdProto) returns (BoolProto); @@ -186,6 +190,7 @@ service TajoWorkerProtocolService { rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto); rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto); rpc cleanup(QueryIdProto) returns (BoolProto); + rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto); } message EnforceProperty { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java new file mode 100644 index 0000000..98251c1 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestDeletionService { + DeletionService deletionService; + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() { + if(deletionService != null){ + deletionService.stop(); + } + } + + @Test + public final void testTemporalDirectory() throws IOException, InterruptedException { + int delay = 1; + deletionService = new DeletionService(1, delay); + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path tempPath = CommonTestingUtil.getTestDir(); + assertTrue(fs.exists(tempPath)); + deletionService.delete(tempPath); + assertTrue(fs.exists(tempPath)); + + Thread.sleep(delay * 2 * 1000); + assertFalse(fs.exists(tempPath)); + } +}
