Repository: tez Updated Branches: refs/heads/branch-0.6 71fa843c0 -> 4b6537699
TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4b653769 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4b653769 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4b653769 Branch: refs/heads/branch-0.6 Commit: 4b65376991ae7c64aab10e50848814d7cb848cd8 Parents: 71fa843 Author: Siddharth Seth <[email protected]> Authored: Tue May 5 11:57:42 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue May 5 11:57:42 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/dag/RootInputInitializerManager.java | 3 +- .../dag/TestRootInputInitializerManager.java | 201 +++++++++++++++++++ 3 files changed, 204 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4b653769/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dc9a42e..c017808 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -188,6 +188,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy. http://git-wip-us.apache.org/repos/asf/tez/blob/4b653769/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index bdd3689..84379e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -405,7 +405,7 @@ public class RootInputInitializerManager { "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used"); } Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName); - Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId); + Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId.getId()); if (successfulAttempt == null) { successfulAttempt = attemptId; vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt); @@ -425,6 +425,7 @@ public class RootInputInitializerManager { if (taskAttemptIndex == successfulAttempt) { toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent()); } + // Drop all other events which have the same source task Id. eventIterator.remove(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4b653769/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java new file mode 100644 index 0000000..89eb2a6 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java @@ -0,0 +1,201 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestRootInputInitializerManager { + + // Simple testing. No events if task doesn't succeed. + // Also exercises path where two attempts are reported as successful via the stateChangeNotifier. + // Primarily a failure scenario, when a Task moves back to running from success + // Order event1, success1, event2, success2 + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testEventBeforeSuccess() throws Exception { + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class); + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput = + new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid); + + InputInitializer initializer = mock(InputInitializer.class); + InputInitializerContext initializerContext = mock(InputInitializerContext.class); + Vertex vertex = mock(Vertex.class); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + RootInputInitializerManager.InitializerWrapper initializerWrapper = + new RootInputInitializerManager.InitializerWrapper(rootInput, initializer, + initializerContext, vertex, stateChangeNotifier, appContext); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2); + TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3); + Vertex srcVertex = mock(Vertex.class); + Task srcTask1 = mock(Task.class); + doReturn(TaskState.RUNNING).when(srcTask1).getState(); + doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId()); + when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex); + + String srcVertexName = "srcVertexName"; + List<TezEvent> eventList = Lists.newLinkedList(); + + + // First Attempt send event + TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1); + EventMetaData sourceInfo11 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId11); + InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te1 = new TezEvent(e1, sourceInfo11); + eventList.add(te1); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + + // First attempt, Task success notification + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId()); + ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture()); + List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue(); + assertEquals(1, invokedEvents.size()); + + reset(initializer); + + // 2nd attempt send event + TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2); + EventMetaData sourceInfo12 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId12); + InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te2 = new TezEvent(e2, sourceInfo12); + eventList.add(te2); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + reset(initializer); + + // 2nd attempt succeeded + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId()); + verify(initializer, never()).handleInputInitializerEvent(argumentCaptor.capture()); + } + + // Order event1 success1, success2, event2 + // Primarily a failure scenario, when a Task moves back to running from success + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testSuccessBeforeEvent() throws Exception { + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class); + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput = + new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid); + + InputInitializer initializer = mock(InputInitializer.class); + InputInitializerContext initializerContext = mock(InputInitializerContext.class); + Vertex vertex = mock(Vertex.class); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + RootInputInitializerManager.InitializerWrapper initializerWrapper = + new RootInputInitializerManager.InitializerWrapper(rootInput, initializer, + initializerContext, vertex, stateChangeNotifier, appContext); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2); + TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3); + Vertex srcVertex = mock(Vertex.class); + Task srcTask1 = mock(Task.class); + doReturn(TaskState.RUNNING).when(srcTask1).getState(); + doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId()); + when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex); + + String srcVertexName = "srcVertexName"; + List<TezEvent> eventList = Lists.newLinkedList(); + + + // First Attempt send event + TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1); + EventMetaData sourceInfo11 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId11); + InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te1 = new TezEvent(e1, sourceInfo11); + eventList.add(te1); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + + // First attempt, Task success notification + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId()); + ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture()); + List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue(); + assertEquals(1, invokedEvents.size()); + + reset(initializer); + + + TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2); + // 2nd attempt succeeded + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId()); + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + + // 2nd attempt send event + EventMetaData sourceInfo12 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId12); + InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te2 = new TezEvent(e2, sourceInfo12); + eventList.add(te2); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + } +}
