Repository: tajo Updated Branches: refs/heads/branch-0.11.2 191faa0fd -> 1ec59c12a
TAJO-2100: Add missing cancellation in defaultTaskScheduler when a worker is no respond. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1ec59c12 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1ec59c12 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1ec59c12 Branch: refs/heads/branch-0.11.2 Commit: 1ec59c12af5020cbcb65d13dde4de37a96d10eb1 Parents: 191faa0 Author: Jinho Kim <[email protected]> Authored: Fri Mar 18 11:29:10 2016 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Mar 18 11:29:10 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/querymaster/DefaultTaskScheduler.java | 54 ++++++++++++++------ .../apache/tajo/querymaster/TaskAttempt.java | 1 + 3 files changed, 43 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1ec59c12/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index d6ae247..71ba9d5 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,9 @@ Release 0.11.2 - unreleased BUG FIXES + TAJO-2100: Add missing cancellation in defaultTaskScheduler when a worker is + no respond. (jinho) + TAJO-2092: TestStorages.testNullHandlingTypesWithProjection occasionally fail. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/1ec59c12/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index c6eaaae..256fa50 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -21,6 +21,7 @@ package org.apache.tajo.querymaster; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,6 +34,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -53,9 +55,11 @@ import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -168,6 +172,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { log.info(String.format("[%s] %s", stage.getId(), message)); } + protected void warn(Log log, String message) { + log.warn(String.format("[%s] %s", stage.getId(), message)); + } + private Fragment[] fragmentsForNonLeafTask; private Fragment[] broadcastFragmentsForNonLeafTask; @@ -596,7 +604,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - public void cancel(TaskAttempt taskAttempt) { + protected void cancel(TaskAttempt taskAttempt) { TaskAttemptToSchedulerEvent schedulerEvent = new TaskAttemptToSchedulerEvent( EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(), @@ -614,6 +622,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { new TaskAttemptEvent(taskAttempt.getId(), TaskAttemptEventType.TA_ASSIGN_CANCEL)); } + protected int cancel(List<TaskAllocationProto> tasks) { + int canceled = 0; + for (TaskAllocationProto proto : tasks) { + TaskAttemptId attemptId = new TaskAttemptId(proto.getTaskRequest().getId()); + cancel(stage.getTask(attemptId.getTaskId()).getAttempt(attemptId)); + canceled++; + } + return canceled; + } + private class ScheduledRequests { // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner @@ -899,17 +917,20 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { BatchAllocationResponse responseProto = callFuture.get(); if (responseProto.getCancellationTaskCount() > 0) { - for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { - cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId()))); - cancellation++; - } - + cancellation += cancel(responseProto.getCancellationTaskList()); info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; } + } catch (ExecutionException | ConnectException e) { + cancellation += cancel(requestProto.getTaskRequestList()); + + warn(LOG, "Canceled requests: " + requestProto.getTaskRequestCount() + + " by " + ExceptionUtils.getFullStackTrace(e)); + continue; } catch (Exception e) { - LOG.error(e); + throw new TajoInternalError(e); } + scheduledObjectNum--; totalAssigned++; hostLocalAssigned += localAssign; @@ -1009,26 +1030,29 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { try { tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true, rpcParams); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); BatchAllocationResponse responseProto = callFuture.get(); if(responseProto.getCancellationTaskCount() > 0) { - for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { - cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId()))); - cancellation++; - } - + cancellation += cancel(responseProto.getCancellationTaskList()); info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; } - totalAssigned++; - scheduledObjectNum--; + } catch (ExecutionException | ConnectException e) { + cancellation += cancel(requestProto.getTaskRequestList()); + warn(LOG, "Canceled requests: " + requestProto.getTaskRequestCount() + + " by " + ExceptionUtils.getFullStackTrace(e)); + continue; } catch (Exception e) { - LOG.error(e); + throw new TajoInternalError(e); } + + totalAssigned++; + scheduledObjectNum--; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1ec59c12/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index 5eef883..04432c6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -322,6 +322,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { } TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); + taskAttempt.getTask().setLaunchTime(System.currentTimeMillis()); taskAttempt.eventHandler.handle( new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_LAUNCHED));
