HIVE-13431. Improvements to LLAPTaskReporter - event throttling. (Siddharth Seth, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5665e34 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5665e34 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5665e34 Branch: refs/heads/master Commit: f5665e34de5a6c6496c82bc59d3b5afaa612ab50 Parents: 0cf2244 Author: Siddharth Seth <[email protected]> Authored: Mon Apr 25 23:18:45 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Apr 25 23:18:45 2016 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/LlapTaskReporter.java | 53 +++++++++++++++----- .../llap/daemon/impl/TaskRunnerCallable.java | 4 +- .../daemon/impl/TaskExecutorTestHelpers.java | 6 +-- .../TestFirstInFirstOutComparator.java | 4 +- 4 files changed, 48 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 08c6f27..dc4482e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -40,8 +40,10 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; @@ -192,7 +194,7 @@ public class LlapTaskReporter implements TaskReporterInterface { @Override public Boolean call() throws Exception { // Heartbeat only for active tasks. Errors, etc will be reported directly. - while (!task.isTaskDone() && !task.hadFatalError()) { + while (!task.isTaskDone() && !task.wasErrorReported()) { ResponseWrapper response = heartbeat(null); if (response.shouldDie) { @@ -217,7 +219,7 @@ public class LlapTaskReporter implements TaskReporterInterface { int pendingEventCount = eventsToSend.size(); if (pendingEventCount > 0) { // This is OK because the pending events will be sent via the succeeded/failed messages. - // TaskDone is set before taskSucceeded/taskFailed are sent out - which is what causes the + // TaskDone is set before taskSucceeded/taskTerminated are sent out - which is what causes the // thread to exit LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount); } @@ -243,7 +245,7 @@ public class LlapTaskReporter implements TaskReporterInterface { List<TezEvent> events = new ArrayList<TezEvent>(); eventsToSend.drainTo(events); - if (!task.isTaskDone() && !task.hadFatalError()) { + if (!task.isTaskDone() && !task.wasErrorReported()) { boolean sendCounters = false; /** * Increasing the heartbeat interval can delay the delivery of events. Sending just updated @@ -262,8 +264,9 @@ public class LlapTaskReporter implements TaskReporterInterface { long requestId = requestCounter.incrementAndGet(); int fromEventId = task.getNextFromEventId(); int fromPreRoutedEventId = task.getNextPreRoutedEventId(); + int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle()); TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, - containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet); + containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat to AM, request=" + request); } @@ -288,7 +291,7 @@ public class LlapTaskReporter implements TaskReporterInterface { // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks // are running using the same umbilical. int numEventsReceived = 0; - if (task.isTaskDone() || task.hadFatalError()) { + if (task.isTaskDone() || task.wasErrorReported()) { if (response.getEvents() != null && !response.getEvents().isEmpty()) { LOG.warn("Current task already complete, Ignoring all event in" + " heartbeat response, eventCount=" + response.getEvents().size()); @@ -372,6 +375,8 @@ public class LlapTaskReporter implements TaskReporterInterface { /** * Sends out final events for task failure. * @param taskAttemptID + * @param isKilled + * @param taskFailureType * @param t * @param diagnostics * @param srcMeta @@ -381,19 +386,33 @@ public class LlapTaskReporter implements TaskReporterInterface { * @throws TezException * indicates an exception somewhere in the AM. */ - private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled, + TaskFailureType taskFailureType, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { // Ensure only one final event is ever sent. if (!finalEventQueued.getAndSet(true)) { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + List<TezEvent> tezEvents = new ArrayList<>(); if (diagnostics == null) { diagnostics = ExceptionUtils.getStackTrace(t); } else { diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); } - TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), - srcMeta == null ? updateEventMetadata : srcMeta); - return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; + + if (isKilled) { + tezEvents.add(new TezEvent(new TaskAttemptKilledEvent(diagnostics), + srcMeta == null ? updateEventMetadata : srcMeta)); + } else { + tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics, + taskFailureType), + srcMeta == null ? updateEventMetadata : srcMeta)); + } + try { + tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata)); + } catch (Exception e) { + // Counter may exceed limitation + LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out"); + } + return !heartbeat(tezEvents).shouldDie; } else { LOG.warn("A final task state event has already been sent. Not sending again"); return askedToDie.get(); @@ -434,9 +453,19 @@ public class LlapTaskReporter implements TaskReporterInterface { } @Override - public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + public boolean taskFailed(TezTaskAttemptID tezTaskAttemptID, TaskFailureType taskFailureType, + Throwable throwable, String diagnostics, EventMetaData srcMeta) throws + IOException, TezException { + return currentCallable + .taskTerminated(tezTaskAttemptID, false, taskFailureType, throwable, diagnostics, srcMeta); + } + + @Override + public boolean taskKilled(TezTaskAttemptID tezTaskAttemptID, Throwable throwable, + String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); + return currentCallable + .taskTerminated(tezTaskAttemptID, true, null, throwable, diagnostics, srcMeta); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index a1cfbb8..2a60123 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -170,7 +170,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { synchronized (this) { if (!shouldRunTask) { LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } } @@ -237,7 +237,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } if (taskRunner == null) { LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } try { http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 4d05c35..24f4442 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -170,14 +170,14 @@ public class TaskExecutorTestHelpers { } } catch (InterruptedException e) { wasInterrupted.set(true); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } finally { lock.unlock(); } if (wasKilled.get()) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } else { - return new TaskRunner2Result(EndReason.SUCCESS, null, false); + return new TaskRunner2Result(EndReason.SUCCESS, null, null, false); } } finally { lock.lock(); http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 73df985..08ee769 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -70,9 +70,9 @@ public class TestFirstInFirstOutComparator { try { Thread.sleep(workTime); } catch (InterruptedException e) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } - return new TaskRunner2Result(EndReason.SUCCESS, null, false); + return new TaskRunner2Result(EndReason.SUCCESS, null, null, false); } @Override
