Repository: tez Updated Branches: refs/heads/master 15d7339e9 -> 923f7b4e2
TEZ-3114. Shuffle OOM due to EventMetaData flood (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/923f7b4e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/923f7b4e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/923f7b4e Branch: refs/heads/master Commit: 923f7b4e298703658598d5cd3809f38f1231c4ab Parents: 15d7339 Author: Jason Lowe <[email protected]> Authored: Fri Feb 26 15:41:50 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Feb 26 15:41:50 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 11 ++++++ .../runtime/LogicalIOProcessorRuntimeTask.java | 8 ++++ .../org/apache/tez/runtime/RuntimeTask.java | 2 + .../apache/tez/runtime/task/TaskReporter.java | 3 +- .../tez/runtime/task/TestTaskReporter.java | 41 ++++++++++++++++++++ 6 files changed, 66 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e8e72b7..dd8b1dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3114. Shuffle OOM due to EventMetaData flood TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating. TEZ-3102. Fetch failure of a speculated task causes job hang TEZ-3124. Running task hangs due to missing event to initialize input in recovery. @@ -386,6 +387,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3114. Shuffle OOM due to EventMetaData flood TEZ-3102. Fetch failure of a speculated task causes job hang TEZ-3126. Log reason for not reducing parallelism TEZ-3123. Containers can get re-used even with conflicting local resources. http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 9f7777f..221ac47 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -715,6 +715,17 @@ public class TezConfiguration extends Configuration { public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500; /** + * Int value. Maximum number of pending task events before a task will stop + * asking for more events in the task heartbeat. + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_TASK_MAX_EVENT_BACKLOG = TEZ_TASK_PREFIX + + "max-event-backlog"; + public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000; + + /** * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output * components need to make successive progress notifications. If the progress is not notified * for this interval then the task will be considered hung and terminated. http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 7f546e6..07f92c2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -153,6 +153,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final ExecutionContext ExecutionContext; private final long memAvailable; private final HadoopShim hadoopShim; + private final int maxEventBacklog; public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, @@ -203,6 +204,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.ExecutionContext = ExecutionContext; this.memAvailable = memAvailable; this.hadoopShim = hadoopShim; + this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG, + TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT); } /** @@ -730,6 +733,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } @Override + public int getMaxEventsToHandle() { + return Math.max(0, maxEventBacklog - eventsToBeProcessed.size()); + } + + @Override public synchronized void handleEvents(Collection<TezEvent> events) { if (events == null || events.isEmpty()) { return; http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 529dde0..59c8104 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -147,6 +147,8 @@ public abstract class RuntimeTask { return taskSpec.getTaskAttemptID(); } + public abstract int getMaxEventsToHandle(); + public abstract void handleEvents(Collection<TezEvent> events); public int getEventCounter() { http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 30a1b9c..e5370d4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -254,8 +254,9 @@ public class TaskReporter implements TaskReporterInterface { long requestId = requestCounter.incrementAndGet(); int fromEventId = task.getNextFromEventId(); int fromPreRoutedEventId = task.getNextPreRoutedEventId(); + int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle()); TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, - containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet); + containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat to AM, request=" + request); } http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index e137d50..04c467a 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -21,13 +21,16 @@ package org.apache.tez.runtime.task; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -45,6 +48,7 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -110,6 +114,43 @@ public class TestTaskReporter { } + @Test(timeout = 10000) + public void testEventThrottling() throws Exception { + TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class); + LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class); + when(mockTask.getMaxEventsToHandle()).thenReturn(10000, 1); + when(mockTask.getVertexName()).thenReturn("vertexName"); + when(mockTask.getTaskAttemptID()).thenReturn(mockTaskAttemptId); + + TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class); + TezHeartbeatResponse resp1 = new TezHeartbeatResponse(createEvents(5)); + resp1.setLastRequestId(1); + TezHeartbeatResponse resp2 = new TezHeartbeatResponse(createEvents(1)); + resp2.setLastRequestId(2); + resp2.setShouldDie(); + when(mockUmbilical.heartbeat(isA(TezHeartbeatRequest.class))).thenReturn(resp1, resp2); + + // Setup the sleep time to be way higher than the test timeout + TaskReporter.HeartbeatCallable heartbeatCallable = + new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5, + new AtomicLong(0), + "containerIdStr"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future<Boolean> result = executor.submit(heartbeatCallable); + Assert.assertFalse(result.get()); + } finally { + executor.shutdownNow(); + } + + ArgumentCaptor<TezHeartbeatRequest> captor = ArgumentCaptor.forClass(TezHeartbeatRequest.class); + verify(mockUmbilical, times(2)).heartbeat(captor.capture()); + TezHeartbeatRequest req = captor.getValue(); + Assert.assertEquals(2, req.getRequestId()); + Assert.assertEquals(1, req.getMaxEvents()); + } + @Test (timeout=5000) public void testStatusUpdateAfterInitializationAndCounterFlag() { TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
