TEZ-2314. Tez task attempt failures due to bad event serialization (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25224477 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25224477 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25224477 Branch: refs/heads/TEZ-2003 Commit: 2522447732f93ec86f76625392ed5f34430d294c Parents: 73bdbb2 Author: Bikas Saha <[email protected]> Authored: Mon Apr 27 22:59:09 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Mon Apr 27 22:59:09 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/runtime/RuntimeTask.java | 5 ++ .../tez/runtime/api/impl/IOStatistics.java | 4 +- .../apache/tez/runtime/task/TaskReporter.java | 19 +++++- .../tez/runtime/task/TestTaskReporter.java | 62 ++++++++++++++++++++ 5 files changed, 86 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d6a0adf..0bd5214 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2314. Tez task attempt failures due to bad event serialization TEZ-2368. Make a dag identifier available in Context classes. TEZ-2325. Route status update event directly to the attempt. TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task. http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 745b10b..f8b8621 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime; import java.util.Collection; +import java.util.EnumSet; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,6 +78,10 @@ public abstract class RuntimeTask { return counter; } + public boolean hasInitialized() { + return EnumSet.of(State.RUNNING, State.CLOSED).contains(state.get()); + } + public String getVertexName() { return taskSpec.getVertexName(); } http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java index 0f8b589..8f28062 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java @@ -24,8 +24,8 @@ import java.io.IOException; import org.apache.hadoop.io.Writable; public class IOStatistics implements Writable { - private long dataSize = 0; - private long numItems = 0; + private volatile long dataSize = 0; + private volatile long numItems = 0; public void setDataSize(long size) { this.dataSize = size; http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index b9e7217..7324abd 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; @@ -41,6 +42,7 @@ import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; @@ -317,9 +319,20 @@ public class TaskReporter { return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; } - private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { - return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : null), - task.getProgress(), task.getTaskStatistics()); + @VisibleForTesting + TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { + TezCounters counters = null; + TaskStatistics stats = null; + float progress = 0; + if (task.hasInitialized()) { + progress = task.getProgress(); + if (sendCounters) { + // send these potentially large objects at longer intervals to avoid overloading the AM + counters = task.getCounters(); + stats = task.getTaskStatistics(); + } + } + return new TaskStatusUpdateEvent(counters, progress, stats); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index 9add252..b44c9f8 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -32,17 +32,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.Lists; + import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +@SuppressWarnings("rawtypes") public class TestTaskReporter { @Test(timeout = 10000) @@ -103,6 +109,62 @@ public class TestTaskReporter { } } + + @Test (timeout=5000) + public void testStatusUpdateAfterInitializationAndCounterFlag() { + TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class); + LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class); + doReturn("vertexName").when(mockTask).getVertexName(); + doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID(); + TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class); + + float progress = 0.5f; + TaskStatistics stats = new TaskStatistics(); + TezCounters counters = new TezCounters(); + doReturn(progress).when(mockTask).getProgress(); + doReturn(stats).when(mockTask).getTaskStatistics(); + doReturn(counters).when(mockTask).getCounters(); + + // Setup the sleep time to be way higher than the test timeout + TaskReporter.HeartbeatCallable heartbeatCallable = + new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5, + new AtomicLong(0), + "containerIdStr"); + + // task not initialized - nothing obtained from task + doReturn(false).when(mockTask).hasInitialized(); + TaskStatusUpdateEvent event = heartbeatCallable.getStatusUpdateEvent(true); + verify(mockTask, times(1)).hasInitialized(); + verify(mockTask, times(0)).getProgress(); + verify(mockTask, times(0)).getTaskStatistics(); + verify(mockTask, times(0)).getCounters(); + Assert.assertEquals(0, event.getProgress(), 0); + Assert.assertNull(event.getCounters()); + Assert.assertNull(event.getStatistics()); + + // task is initialized - progress obtained but not counters since flag is false + doReturn(true).when(mockTask).hasInitialized(); + event = heartbeatCallable.getStatusUpdateEvent(false); + verify(mockTask, times(2)).hasInitialized(); + verify(mockTask, times(1)).getProgress(); + verify(mockTask, times(0)).getTaskStatistics(); + verify(mockTask, times(0)).getCounters(); + Assert.assertEquals(progress, event.getProgress(), 0); + Assert.assertNull(event.getCounters()); + Assert.assertNull(event.getStatistics()); + + // task is initialized - progress obtained and also counters since flag is true + doReturn(true).when(mockTask).hasInitialized(); + event = heartbeatCallable.getStatusUpdateEvent(true); + verify(mockTask, times(3)).hasInitialized(); + verify(mockTask, times(2)).getProgress(); + verify(mockTask, times(1)).getTaskStatistics(); + verify(mockTask, times(1)).getCounters(); + Assert.assertEquals(progress, event.getProgress(), 0); + Assert.assertEquals(counters, event.getCounters()); + Assert.assertEquals(stats, event.getStatistics()); + + } private List<TezEvent> createEvents(int numEvents) { List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
