Repository: tajo Updated Branches: refs/heads/master 64416dea6 -> 15450e868
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 8009ce3..5eb66b8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -18,38 +18,45 @@ package org.apache.tajo.worker; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; -import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.util.Pair; +import org.apache.tajo.engine.utils.TupleCache; +import org.apache.tajo.worker.event.TaskRunnerEvent; +import org.apache.tajo.worker.event.TaskRunnerStartEvent; +import org.apache.tajo.worker.event.TaskRunnerStopEvent; +import java.io.IOException; +import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -public class TaskRunnerManager extends CompositeService { +public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> { private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class); - private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>(); - private final Map<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap(); + private final ConcurrentMap<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap = Maps.newConcurrentMap(); + private final ConcurrentMap<String, TaskRunner> taskRunnerMap = Maps.newConcurrentMap(); + private final ConcurrentMap<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap(); private TajoWorker.WorkerContext workerContext; private TajoConf tajoConf; private AtomicBoolean stop = new AtomicBoolean(false); private FinishedTaskCleanThread finishedTaskCleanThread; + private Dispatcher dispatcher; - public TaskRunnerManager(TajoWorker.WorkerContext workerContext) { + public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) { super(TaskRunnerManager.class.getName()); this.workerContext = workerContext; + this.dispatcher = dispatcher; } public TajoWorker.WorkerContext getWorkerContext() { @@ -58,7 +65,9 @@ public class TaskRunnerManager extends CompositeService { @Override public void init(Configuration conf) { + Preconditions.checkArgument(conf instanceof TajoConf); tajoConf = (TajoConf)conf; + dispatcher.register(TaskRunnerEvent.EventType.class, this); super.init(tajoConf); } @@ -82,6 +91,9 @@ public class TaskRunnerManager extends CompositeService { } } } + for(ExecutionBlockContext context: executionBlockContextMap.values()) { + context.stop(); + } if(finishedTaskCleanThread != null) { finishedTaskCleanThread.interrupted(); @@ -92,148 +104,35 @@ public class TaskRunnerManager extends CompositeService { } } - public void stopTask(String id) { + public void stopTaskRunner(String id) { LOG.info("Stop Task:" + id); - synchronized(taskRunnerMap) { - TaskRunner taskRunner = taskRunnerMap.remove(id); - if (taskRunner != null) { - synchronized(taskRunnerCompleteCounter) { - ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId(); - AtomicInteger ebSuccessedTaskNums = successedTaskNums.get(ebId); - if (ebSuccessedTaskNums == null) { - ebSuccessedTaskNums = new AtomicInteger(taskRunner.getContext().succeededTasksNum.get()); - successedTaskNums.put(ebId, ebSuccessedTaskNums); - } else { - ebSuccessedTaskNums.addAndGet(taskRunner.getContext().succeededTasksNum.get()); - } - - Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId); - - if (counter != null) { - if (counter.getSecond().decrementAndGet() <= 0) { - LOG.info(ebId + "'s all tasks are completed."); - try { - closeExecutionBlock(ebId, ebSuccessedTaskNums.get(), taskRunner); - } catch (Exception e) { - LOG.error(ebId + ", closing error:" + e.getMessage(), e); - } - successedTaskNums.remove(ebId); - taskRunnerCompleteCounter.remove(ebId); - } - } - } - } - } + TaskRunner taskRunner = taskRunnerMap.remove(id); + taskRunner.stop(); if(workerContext.isYarnContainerMode()) { stop(); } } - private void closeExecutionBlock(ExecutionBlockId ebId, int succeededTasks, TaskRunner lastTaskRunner) throws Exception { - TajoWorkerProtocol.ExecutionBlockReport.Builder reporterBuilder = - TajoWorkerProtocol.ExecutionBlockReport.newBuilder(); - reporterBuilder.setEbId(ebId.getProto()); - reporterBuilder.setReportSuccess(true); - reporterBuilder.setSucceededTasks(succeededTasks); - try { - List<TajoWorkerProtocol.IntermediateEntryProto> intermediateEntries = - new ArrayList<TajoWorkerProtocol.IntermediateEntryProto>(); - List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles = - workerContext.getHashShuffleAppenderManager().close(ebId); - if (shuffles == null) { - reporterBuilder.addAllIntermediateEntries(intermediateEntries); - lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build()); - return; - } - - TajoWorkerProtocol.IntermediateEntryProto.Builder intermediateBuilder = - TajoWorkerProtocol.IntermediateEntryProto.newBuilder(); - TajoWorkerProtocol.IntermediateEntryProto.PageProto.Builder pageBuilder = - TajoWorkerProtocol.IntermediateEntryProto.PageProto.newBuilder(); - TajoWorkerProtocol.FailureIntermediateProto.Builder failureBuilder = - TajoWorkerProtocol.FailureIntermediateProto.newBuilder(); - - for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) { - List<TajoWorkerProtocol.IntermediateEntryProto.PageProto> pages = - new ArrayList<TajoWorkerProtocol.IntermediateEntryProto.PageProto>(); - List<TajoWorkerProtocol.FailureIntermediateProto> failureIntermediateItems = - new ArrayList<TajoWorkerProtocol.FailureIntermediateProto>(); - - for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) { - pageBuilder.clear(); - pageBuilder.setPos(eachPage.getFirst()); - pageBuilder.setLength(eachPage.getSecond()); - pages.add(pageBuilder.build()); - } - - for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) { - failureBuilder.clear(); - failureBuilder.setPagePos(eachFailure.getFirst()); - failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst()); - failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond()); - failureIntermediateItems.add(failureBuilder.build()); - } - - intermediateBuilder.clear(); - - intermediateBuilder.setEbId(ebId.getProto()) - .setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName() + ":" + - workerContext.getPullServerPort()) - .setTaskId(-1) - .setAttemptId(-1) - .setPartId(eachShuffle.getPartId()) - .setVolume(eachShuffle.getVolume()) - .addAllPages(pages) - .addAllFailures(failureIntermediateItems); - - intermediateEntries.add(intermediateBuilder.build()); - } - - // send intermediateEntries to QueryMaster - reporterBuilder.addAllIntermediateEntries(intermediateEntries); - - } catch (Exception e) { - LOG.error(e.getMessage(), e); - reporterBuilder.setReportSuccess(false); - if (e.getMessage() == null) { - reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName()); - } else { - reporterBuilder.setReportErrorMessage(e.getMessage()); - } - } - lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build()); - } - public Collection<TaskRunner> getTaskRunners() { - synchronized(taskRunnerMap) { - return Collections.unmodifiableCollection(taskRunnerMap.values()); - } + return Collections.unmodifiableCollection(taskRunnerMap.values()); } public Collection<TaskRunnerHistory> getExecutionBlockHistories() { - synchronized(taskRunnerHistoryMap) { - return Collections.unmodifiableCollection(taskRunnerHistoryMap.values()); - } + return Collections.unmodifiableCollection(taskRunnerHistoryMap.values()); } public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) { - synchronized(taskRunnerHistoryMap) { - return taskRunnerHistoryMap.get(taskRunnerId); - } + return taskRunnerHistoryMap.get(taskRunnerId); } public TaskRunner getTaskRunner(String taskRunnerId) { - synchronized(taskRunnerMap) { - return taskRunnerMap.get(taskRunnerId); - } + return taskRunnerMap.get(taskRunnerId); } - public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) { - synchronized(taskRunnerMap) { - for (TaskRunner eachTaskRunner: taskRunnerMap.values()) { - Task task = eachTaskRunner.getContext().getTask(quAttemptId); - if (task != null) return task; - } + public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId queryUnitAttemptId) { + ExecutionBlockContext context = executionBlockContextMap.get(queryUnitAttemptId.getQueryUnitId().getExecutionBlockId()); + if (context != null) { + return context.getTask(queryUnitAttemptId); } return null; } @@ -250,53 +149,61 @@ public class TaskRunnerManager extends CompositeService { } public int getNumTasks() { - synchronized(taskRunnerMap) { - return taskRunnerMap.size(); - } + return taskRunnerMap.size(); } - //<# tasks, # running tasks> - Map<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>> taskRunnerCompleteCounter = - new HashMap<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>>(); - - Map<ExecutionBlockId, AtomicInteger> successedTaskNums = new HashMap<ExecutionBlockId, AtomicInteger>(); - - public void startTask(final String[] params) { - //TODO change to use event dispatcher - Thread t = new Thread() { - public void run() { + @Override + public void handle(TaskRunnerEvent event) { + LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType()); + if (event instanceof TaskRunnerStartEvent) { + TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event; + ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId()); + String[] params = startEvent.getParams(); + if(context == null){ try { - TajoConf systemConf = new TajoConf(tajoConf); - TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params); - LOG.info("Start TaskRunner:" + taskRunner.getId()); - synchronized(taskRunnerMap) { - taskRunnerMap.put(taskRunner.getId(), taskRunner); - } + // QueryMaster's address + String host = params[4]; + int port = Integer.parseInt(params[5]); + + context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port)); + } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + throw new RuntimeException(e); + } + executionBlockContextMap.put(event.getExecutionBlockId(), context); + } - synchronized (taskRunnerHistoryMap){ - taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getContext().getExcutionBlockHistory()); - } + TaskRunner taskRunner = new TaskRunner(context, params); + LOG.info("Start TaskRunner:" + taskRunner.getId()); + taskRunnerMap.put(taskRunner.getId(), taskRunner); + taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory()); - synchronized(taskRunnerCompleteCounter) { - ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId(); - Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId); - if (counter == null) { - counter = new Pair(new AtomicInteger(0), new AtomicInteger(0)); - taskRunnerCompleteCounter.put(ebId, counter); - } - counter.getFirst().incrementAndGet(); - counter.getSecond().incrementAndGet(); - } - taskRunner.init(systemConf); - taskRunner.start(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); + taskRunner.init(context.getConf()); + taskRunner.start(); + + } else if (event instanceof TaskRunnerStopEvent) { + ExecutionBlockContext executionBlockContext = executionBlockContextMap.remove(event.getExecutionBlockId()); + if(executionBlockContext != null){ + TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId()); + executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); + executionBlockContext.stop(); + try { + workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId()); + } catch (IOException e) { + LOG.fatal(e.getMessage(), e); + throw new RuntimeException(e); } } - }; + LOG.info("Stopped execution block:" + event.getExecutionBlockId()); + } + } + + public EventHandler getEventHandler(){ + return dispatcher.getEventHandler(); + } - t.start(); + public TajoConf getTajoConf() { + return tajoConf; } class FinishedTaskCleanThread extends Thread { http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java new file mode 100644 index 0000000..aac8973 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.ExecutionBlockId; + +public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> { + public enum EventType { + START, + STOP + } + + protected final ExecutionBlockId executionBlockId; + + public TaskRunnerEvent(EventType eventType, + ExecutionBlockId executionBlockId) { + super(eventType); + this.executionBlockId = executionBlockId; + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java new file mode 100644 index 0000000..8c9fa51 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.engine.query.QueryContext; + +public class TaskRunnerStartEvent extends TaskRunnerEvent { + + private final QueryContext queryContext; + private final String[] params; + private final String plan; + + public TaskRunnerStartEvent(String[] params, + ExecutionBlockId executionBlockId, + QueryContext context, + String plan) { + super(EventType.START, executionBlockId); + this.params = params; + this.queryContext = context; + this.plan = plan; + } + + public String[] getParams(){ + return this.params; + } + + public QueryContext getQueryContext() { + return queryContext; + } + + public String getPlan() { + return plan; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java new file mode 100644 index 0000000..c8ec20d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.ExecutionBlockId; + +public class TaskRunnerStopEvent extends TaskRunnerEvent { + + public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) { + super(EventType.STOP, executionBlockId); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/QueryMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto index 0119a88..06d2a42 100644 --- a/tajo-core/src/main/proto/QueryMasterProtocol.proto +++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto @@ -32,7 +32,7 @@ service QueryMasterProtocolService { //from Worker rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto); rpc statusUpdate (TaskStatusProto) returns (BoolProto); - rpc ping (QueryUnitAttemptIdProto) returns (BoolProto); + rpc ping (ExecutionBlockIdProto) returns (BoolProto); rpc fatalError(TaskFatalErrorReport) returns (BoolProto); rpc done (TaskCompletionReport) returns (BoolProto); rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index e100c48..dff2733 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -201,7 +201,7 @@ message DataChannelProto { } message RunExecutionBlockRequestProto { - required string executionBlockId = 1; + required ExecutionBlockIdProto executionBlockId = 1; required string queryMasterHost = 2; required int32 queryMasterPort = 3; required string nodeId = 4; @@ -220,7 +220,8 @@ service TajoWorkerProtocolService { rpc ping (QueryUnitAttemptIdProto) returns (BoolProto); // from QueryMaster(Worker) - rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto); + rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto); + rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto); rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto); rpc cleanup(QueryIdProto) returns (BoolProto); rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index 95c06bb..b755e02 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -106,6 +106,8 @@ public class TestFetcher { @Test public void testAdjustFetchProcess() { + assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0); + assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0); assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0); assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0); assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);
