TAJO-1560: HashShuffle report should be ignored when a succeed tasks are not included. (jinho)
Closes #538 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f72d11f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f72d11f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f72d11f Branch: refs/heads/index_support Commit: 1f72d11f1d2bd48e895cbeb8a7228a854633fe2b Parents: ad596bb Author: Jinho Kim <[email protected]> Authored: Sun Apr 19 18:45:51 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Sun Apr 19 18:45:51 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/master/TajoContainerProxy.java | 9 +- .../java/org/apache/tajo/querymaster/Stage.java | 133 +++++++++++-------- .../apache/tajo/util/history/HistoryWriter.java | 30 ++--- .../tajo/worker/ExecutionBlockContext.java | 46 ++++--- .../tajo/worker/TajoWorkerManagerService.java | 3 +- .../apache/tajo/worker/TaskRunnerManager.java | 16 ++- .../tajo/worker/event/TaskRunnerStartEvent.java | 10 +- .../src/main/proto/TajoWorkerProtocol.proto | 1 + .../apache/tajo/querymaster/TestKillQuery.java | 3 +- 10 files changed, 156 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 510c1a3..7d5cc3c 100644 --- a/CHANGES +++ b/CHANGES @@ -96,6 +96,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1560: HashShuffle report should be ignored when a succeed tasks are not + included. (jinho) + TAJO-1569: BlockingRpcClient can make other request fail. (jinho) TAJO-1564: TestFetcher fails occasionally. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 1fda7d4..2aac005 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -32,6 +32,7 @@ import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.TaskFatalErrorEvent; import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; @@ -93,15 +94,16 @@ public class TajoContainerProxy extends ContainerProxy { } private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container) { - NettyClientBase tajoWorkerRpc = null; + NettyClientBase tajoWorkerRpc; try { - InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext() - .getQueryMasterManagerService().getBindAddr(); InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + PlanProto.ShuffleType shuffleType = + context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType(); + TajoWorkerProtocol.RunExecutionBlockRequestProto request = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder() .setExecutionBlockId(executionBlockId.getProto()) @@ -111,6 +113,7 @@ public class TajoContainerProxy extends ContainerProxy { .setQueryOutputPath(context.getStagingDir().toString()) .setQueryContext(queryContext.getProto()) .setPlanJson(planJson) + .setShuffleType(shuffleType) .build(); tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 20add9f..4179003 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -239,7 +239,8 @@ public class Stage implements EventHandler<StageEvent> { EnumSet.of( StageEventType.SQ_START, StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED)) + StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_SHUFFLE_REPORT)) // Transitions from KILLED state .addTransition(StageState.KILLED, StageState.KILLED, @@ -1300,6 +1301,53 @@ public class Stage implements EventHandler<StageEvent> { stopShuffleReceiver.set(true); } + private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType type) { + if(!checkIfNeedFinalizing(type)) return; + + TajoWorkerProtocol.ExecutionBlockReport report = event.getReport(); + + if (!report.getReportSuccess()) { + stopFinalization(); + LOG.error(getId() + ", " + type + " report are failed. Caused by:" + report.getReportErrorMessage()); + eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_FAILED)); + } + + completedShuffleTasks.addAndGet(report.getSucceededTasks()); + if (report.getIntermediateEntriesCount() > 0) { + for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) { + hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); + } + } + + if (completedShuffleTasks.get() >= succeededObjectCount) { + LOG.info(getId() + ", Finalized " + type + " reports: " + completedShuffleTasks.get()); + eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_STAGE_COMPLETED)); + if (timeoutChecker != null) { + stopFinalization(); + synchronized (timeoutChecker){ + timeoutChecker.notifyAll(); + } + } + } else { + LOG.info(getId() + ", Received " + type + " reports " + + completedShuffleTasks.get() + "/" + succeededObjectCount); + } + } + + /** + * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should get report from worker nodes when ExecutionBlock is stopping. + * RANGE_SHUFFLE report is sent from task reporter when a task finished in worker node. + */ + public static boolean checkIfNeedFinalizing(ShuffleType type) { + switch (type) { + case HASH_SHUFFLE: + case SCATTERED_HASH_SHUFFLE: + return true; + default: + return false; + } + } + private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent> { @Override @@ -1310,71 +1358,50 @@ public class Stage implements EventHandler<StageEvent> { } stage.lastContactTime = System.currentTimeMillis(); + ShuffleType shuffleType = stage.getDataChannel().getShuffleType(); try { if (event instanceof StageShuffleReportEvent) { - - StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event; - TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport(); - - if (!report.getReportSuccess()) { - stage.stopFinalization(); - LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage()); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); - } - - stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks()); - if (report.getIntermediateEntriesCount() > 0) { - for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) { - stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); - } - } - - if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) { - LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get()); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); - if (stage.timeoutChecker != null) { - stage.stopFinalization(); - synchronized (stage.timeoutChecker){ - stage.timeoutChecker.notifyAll(); - } - } - } else { - LOG.info(stage.getId() + ", Received shuffle report: " + - stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); - } - + stage.finalizeShuffleReport((StageShuffleReportEvent) event, shuffleType); } else { - LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)", + LOG.info(String.format("Stage - %s finalize %s (total=%d, success=%d, killed=%d)", stage.getId().toString(), + shuffleType, stage.totalScheduledObjectsCount, stage.succeededObjectCount, stage.killedObjectCount)); stage.finalizeStage(); - LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount); + if (checkIfNeedFinalizing(shuffleType)) { + /* wait for StageShuffleReportEvent from worker nodes */ + + LOG.info(stage.getId() + ", wait for " + shuffleType + " reports. expected Tasks:" + + stage.succeededObjectCount); /* FIXME implement timeout handler of stage and task */ - if (stage.timeoutChecker != null) { - stage.timeoutChecker = new Thread(new Runnable() { - @Override - public void run() { - while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) { - long elapsedTime = System.currentTimeMillis() - stage.lastContactTime; - if (elapsedTime > 120 * 1000) { - stage.stopFinalization(); - LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime - + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); - } - synchronized (this) { - try { - this.wait(1 * 1000); - } catch (InterruptedException e) { + if (stage.timeoutChecker != null) { + stage.timeoutChecker = new Thread(new Runnable() { + @Override + public void run() { + while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) { + long elapsedTime = System.currentTimeMillis() - stage.lastContactTime; + if (elapsedTime > 120 * 1000) { + stage.stopFinalization(); + LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime + + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); + } + synchronized (this) { + try { + this.wait(1 * 1000); + } catch (InterruptedException e) { + } } } } - } - }); - stage.timeoutChecker.start(); + }); + stage.timeoutChecker.start(); + } + } else { + stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); } } } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index f0c6c11..e8ba304 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -132,7 +132,7 @@ public class HistoryWriter extends AbstractService { } /* asynchronously flush to history file */ - public synchronized WriterFuture<WriterHolder> appendAndFlush(History history) { + public WriterFuture<WriterHolder> appendAndFlush(History history) { WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history) { public void done(WriterHolder holder) { try { @@ -163,7 +163,7 @@ public class HistoryWriter extends AbstractService { } /* Flushing the buffer */ - public synchronized void flushTaskHistories() { + public void flushTaskHistories() { if (historyQueue.size() > 0) { synchronized (writerThread) { writerThread.needTaskFlush.set(true); @@ -244,20 +244,16 @@ public class HistoryWriter extends AbstractService { cal.add(Calendar.HOUR_OF_DAY, -2); String closeTargetTime = df.format(cal.getTime()); List<String> closingTargets = new ArrayList<String>(); - synchronized (taskWriters) { - for (String eachWriterTime : taskWriters.keySet()) { - if (eachWriterTime.compareTo(closeTargetTime) <= 0) { - closingTargets.add(eachWriterTime); - } + + for (String eachWriterTime : taskWriters.keySet()) { + if (eachWriterTime.compareTo(closeTargetTime) <= 0) { + closingTargets.add(eachWriterTime); } } for (String eachWriterTime : closingTargets) { WriterHolder writerHolder; - synchronized (taskWriters) { - writerHolder = taskWriters.remove(eachWriterTime); - } - + writerHolder = taskWriters.remove(eachWriterTime); if (writerHolder != null) { LOG.info("Closing task history file: " + writerHolder.path); IOUtils.cleanup(LOG, writerHolder); @@ -340,7 +336,7 @@ public class HistoryWriter extends AbstractService { return histories; } - private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Exception { + private void writeQueryHistory(QueryHistory queryHistory) throws Exception { // QueryMaster's query detail history (json format) // <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/query.hist @@ -381,7 +377,7 @@ public class HistoryWriter extends AbstractService { } } - private synchronized WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception { + private WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception { if(stopped.get()) return null; // writing to HDFS and rolling hourly @@ -409,7 +405,7 @@ public class HistoryWriter extends AbstractService { return querySummaryWriter; } - private synchronized void rollingQuerySummaryWriter() throws Exception { + private void rollingQuerySummaryWriter() throws Exception { // finding largest file sequence SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss"); String currentDateTime = df.format(new Date(System.currentTimeMillis())); @@ -442,7 +438,7 @@ public class HistoryWriter extends AbstractService { } } - private synchronized WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception { + private WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception { SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); String taskStartTime = df.format(new Date(taskHistory.getStartTime())); @@ -536,14 +532,14 @@ public class HistoryWriter extends AbstractService { FSDataOutputStream out; @Override - public synchronized void close() throws IOException { + public void close() throws IOException { if (out != null) out.close(); } /* * Sync buffered data to DataNodes or disks (flush to disk devices). */ - private synchronized void flush() throws IOException { + private void flush() throws IOException { if (out != null) out.hsync(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 5ffc7a9..fcf787e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -20,6 +20,7 @@ package org.apache.tajo.worker; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.netty.channel.ConnectTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +35,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcClientManager; @@ -42,9 +44,6 @@ import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; -import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.EventLoopGroup; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -66,8 +65,6 @@ public class ExecutionBlockContext { public AtomicInteger killedTasksNum = new AtomicInteger(); public AtomicInteger failedTasksNum = new AtomicInteger(); - private EventLoopGroup loopGroup; - // for temporal or intermediate files private FileSystem localFS; // for input files private FileSystem defaultFS; @@ -90,6 +87,8 @@ public class ExecutionBlockContext { private AtomicBoolean stop = new AtomicBoolean(); + private PlanProto.ShuffleType shuffleType; + // It keeps all of the query unit attempts while a TaskRunner is running. private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap(); @@ -97,7 +96,8 @@ public class ExecutionBlockContext { public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext, TaskRunnerManager manager, QueryContext queryContext, String plan, - ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable { + ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster, + PlanProto.ShuffleType shuffleType) throws Throwable { this.manager = manager; this.executionBlockId = executionBlockId; this.connManager = RpcClientManager.getInstance(); @@ -114,6 +114,7 @@ public class ExecutionBlockContext { this.plan = plan; this.resource = new ExecutionBlockSharedResource(); this.workerContext = workerContext; + this.shuffleType = shuffleType; } public void init() throws Throwable { @@ -193,10 +194,6 @@ public class ExecutionBlockContext { return localFS; } - public FileSystem getDefaultFS() { - return defaultFS; - } - public LocalDirAllocator getLocalDirAllocator() { return workerContext.getLocalDirAllocator(); } @@ -264,13 +261,30 @@ public class ExecutionBlockContext { return workerContext; } - private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception { + /** + * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should send report when this executionBlock stopping. + */ + protected void sendShuffleReport() throws Exception { + + switch (shuffleType) { + case HASH_SHUFFLE: + case SCATTERED_HASH_SHUFFLE: + sendHashShuffleReport(executionBlockId); + break; + case NONE_SHUFFLE: + case RANGE_SHUFFLE: + default: + break; + } + } + + private void sendHashShuffleReport(ExecutionBlockId ebId) throws Exception { + /* This case is that worker did not ran tasks */ + if(completedTasksNum.get() == 0) return; + NettyClientBase client = getQueryMasterConnection(); QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); - stub.doneExecutionBlock(null, reporter, NullCallback.get()); - } - protected void reportExecutionBlock(ExecutionBlockId ebId) { ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder(); reporterBuilder.setEbId(ebId.getProto()); reporterBuilder.setReportSuccess(true); @@ -281,7 +295,7 @@ public class ExecutionBlockContext { getWorkerContext().getHashShuffleAppenderManager().close(ebId); if (shuffles == null) { reporterBuilder.addAllIntermediateEntries(intermediateEntries); - sendExecutionBlockReport(reporterBuilder.build()); + stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); return; } @@ -334,7 +348,7 @@ public class ExecutionBlockContext { } } try { - sendExecutionBlockReport(reporterBuilder.build()); + stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 4a09772..71d96c4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -121,7 +121,8 @@ public class TajoWorkerManagerService extends CompositeService , new ExecutionBlockId(request.getExecutionBlockId()) , request.getContainerId() , new QueryContext(workerContext.getConf(), request.getQueryContext()), - request.getPlanJson() + request.getPlanJson(), + request.getShuffleType() )); done.run(TajoWorker.TRUE_PROTO); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index b3c28b3..734a8a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -32,7 +32,6 @@ import org.apache.tajo.worker.event.TaskRunnerEvent; import org.apache.tajo.worker.event.TaskRunnerStartEvent; import org.apache.tajo.worker.event.TaskRunnerStopEvent; -import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -155,8 +154,14 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< if(context == null){ try { - context = new ExecutionBlockContext(getTajoConf(), getWorkerContext(), this, startEvent.getQueryContext(), - startEvent.getPlan(), startEvent.getExecutionBlockId(), startEvent.getQueryMaster()); + context = new ExecutionBlockContext(getTajoConf(), + getWorkerContext(), + this, + startEvent.getQueryContext(), + startEvent.getPlan(), + startEvent.getExecutionBlockId(), + startEvent.getQueryMaster(), + startEvent.getShuffleType()); context.init(); } catch (Throwable e) { LOG.fatal(e.getMessage(), e); @@ -178,10 +183,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< if(executionBlockContext != null){ try { executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId()); - executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); - workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId()); + executionBlockContext.sendShuffleReport(); workerContext.getTaskHistoryWriter().flushTaskHistories(); - } catch (IOException e) { + } catch (Exception e) { LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java index ff63754..908afa2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java @@ -21,6 +21,7 @@ package org.apache.tajo.worker.event; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; public class TaskRunnerStartEvent extends TaskRunnerEvent { @@ -28,17 +29,20 @@ public class TaskRunnerStartEvent extends TaskRunnerEvent { private final WorkerConnectionInfo queryMaster; private final String containerId; private final String plan; + private final PlanProto.ShuffleType shuffleType; public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster, ExecutionBlockId executionBlockId, String containerId, QueryContext context, - String plan) { + String plan, + PlanProto.ShuffleType shuffleType) { super(EventType.START, executionBlockId); this.queryMaster = queryMaster; this.containerId = containerId; this.queryContext = context; this.plan = plan; + this.shuffleType = shuffleType; } public WorkerConnectionInfo getQueryMaster() { @@ -56,4 +60,8 @@ public class TaskRunnerStartEvent extends TaskRunnerEvent { public String getPlan() { return plan; } + + public PlanProto.ShuffleType getShuffleType() { + return shuffleType; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index b8c9575..fddef8f 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -201,6 +201,7 @@ message RunExecutionBlockRequestProto { required KeyValueSetProto queryContext = 6; required string planJson = 7; + required ShuffleType shuffleType = 8; } message ExecutionBlockListProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 09be700..b2e1ce9 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -211,7 +211,8 @@ public class TestKillQuery { taskRequest.setInterQuery(); TaskAttemptId attemptId = new TaskAttemptId(tid, 1); - ExecutionBlockContext context = new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null); + ExecutionBlockContext context = + new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null, null); org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId, conf, context, taskRequest);
