http://git-wip-us.apache.org/repos/asf/tez/blob/2c4ef9fe/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index ad4cceb..b824d0b 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -19,291 +19,60 @@
package org.apache.tez.dag.library.vertexmanager;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
-import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.*;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
-import org.apache.tez.runtime.api.TaskIdentifier;
-import org.apache.tez.runtime.api.VertexIdentifier;
-import org.apache.tez.runtime.api.VertexStatistics;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-import
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.roaringbitmap.RoaringBitmap;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyMap;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings({ "unchecked", "rawtypes" })
-@RunWith(Parameterized.class)
-public class TestShuffleVertexManager {
+public class TestShuffleVertexManager extends TestShuffleVertexManagerUtils {
- TezVertexID vertexId =
TezVertexID.fromString("vertex_1436907267600_195589_1_00");
- int taskId = 0;
List<TaskAttemptIdentifier> emptyCompletions = null;
- Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass;
-
- @SuppressWarnings("deprecation")
- @Parameterized.Parameters(name = "test[{0}]")
- public static Collection<Object[]> data() {
- Object[][] data = new Object[][]{
- {ShuffleVertexManager.class}};
- return Arrays.asList(data);
- }
-
- public TestShuffleVertexManager(
- Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass) {
- this.shuffleVertexManagerClass = shuffleVertexManagerClass;
- }
@Test(timeout = 5000)
- public void testShuffleVertexManagerAutoParallelism() throws Exception {
+ public void testLargeDataSize() throws IOException {
Configuration conf = new Configuration();
- ShuffleVertexManagerBase manager = null;
+ ShuffleVertexManagerBase manager;
- HashMap<String, EdgeProperty> mockInputVertices =
- new HashMap<String, EdgeProperty>();
- String mockSrcVertexId1 = "Vertex1";
- EdgeProperty eProp1 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String mockSrcVertexId2 = "Vertex2";
- EdgeProperty eProp2 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String mockSrcVertexId3 = "Vertex3";
- EdgeProperty eProp3 = EdgeProperty.create(
- EdgeProperty.DataMovementType.BROADCAST,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
-
+ final String mockSrcVertexId1 = "Vertex1";
+ final String mockSrcVertexId2 = "Vertex2";
+ final String mockSrcVertexId3 = "Vertex3";
final String mockManagedVertexId = "Vertex4";
-
- mockInputVertices.put(mockSrcVertexId1, eProp1);
- mockInputVertices.put(mockSrcVertexId2, eProp2);
- mockInputVertices.put(mockSrcVertexId3, eProp3);
-
- final VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
- //Check via setters
- ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer =
ShuffleVertexManager
- .createConfigBuilder(null);
- VertexManagerPluginDescriptor pluginDesc =
configurer.setAutoReduceParallelism(true)
- .setDesiredTaskInputSize(1000l)
-
.setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build();
- when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
-
-
- manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
- new Class[]{VertexManagerPluginContext.class}, new
Object[]{mockContext});
- manager.initialize();
- verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez
notified of reconfig
-
- Assert.assertTrue(manager.config.isAutoParallelismEnabled() == true);
- Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == 1000l);
- if (manager instanceof ShuffleVertexManager) {
- Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig.
- getMinTaskParallelism() == 10);
- }
- Assert.assertTrue(manager.config.getMinFraction() == 0.25f);
- Assert.assertTrue(manager.config.getMaxFraction() == 0.5f);
-
- configurer = ShuffleVertexManager.createConfigBuilder(null);
- pluginDesc = configurer.setAutoReduceParallelism(false).build();
- when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
- manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
- new Class[]{VertexManagerPluginContext.class}, new
Object[]{mockContext});
- manager.initialize();
- verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez not
notified of reconfig
-
- Assert.assertTrue(manager.config.isAutoParallelismEnabled() == false);
- Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() ==
-
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
- if (manager instanceof ShuffleVertexManager) {
- Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig.
- getMinTaskParallelism() == 1);
- }
- Assert.assertTrue(manager.config.getMinFraction() ==
-
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
- Assert.assertTrue(manager.config.getMaxFraction() ==
-
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
-
-
- final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- scheduledTasks.clear();
- List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
- for (ScheduleTaskRequest task : tasks) {
- scheduledTasks.add(task.getTaskIndex());
- }
- return null;
- }}).when(mockContext).scheduleTasks(anyList());
-
+ final List<Integer> scheduledTasks = Lists.newLinkedList();
final Map<String, EdgeManagerPlugin> newEdgeManagers =
new HashMap<String, EdgeManagerPlugin>();
-
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) throws Exception {
- final int numTasks =
((Integer)invocation.getArguments()[0]).intValue();
-
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks);
- newEdgeManagers.clear();
- for (Entry<String, EdgeProperty> entry :
- ((Map<String,
EdgeProperty>)invocation.getArguments()[2]).entrySet()) {
-
- EdgeManagerPluginDescriptor pluginDesc =
entry.getValue().getEdgeManagerDescriptor();
- final UserPayload userPayload = pluginDesc.getUserPayload();
- EdgeManagerPluginContext emContext = new
EdgeManagerPluginContext() {
- @Override
- public UserPayload getUserPayload() {
- return userPayload == null ? null : userPayload;
- }
-
- @Override
- public String getSourceVertexName() {
- return null;
- }
-
- @Override
- public String getDestinationVertexName() {
- return null;
- }
-
- @Override
- public int getSourceVertexNumTasks() {
- return 2;
- }
-
- @Override
- public int getDestinationVertexNumTasks() {
- return numTasks;
- }
- };
- EdgeManagerPlugin edgeManager = ReflectionUtils
- .createClazzInstance(pluginDesc.getClassName(),
- new Class[]{EdgeManagerPluginContext.class}, new
Object[]{emContext});
- edgeManager.initialize();
- newEdgeManagers.put(entry.getKey(), edgeManager);
- }
- return null;
- }}).when(mockContext).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
-
- // check initialization
- manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of
reconfig
- // source vertices have 0 tasks.
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
- when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
-
- manager.onVertexStarted(emptyCompletions);
- verify(mockContext, times(2)).vertexReconfigurationPlanned();
- Assert.assertTrue(manager.bipartiteSources == 2);
-
- // check waiting for notification before scheduling
- Assert.assertFalse(manager.pendingTasks.isEmpty());
- // source vertices have 0 tasks. triggers scheduling
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- verify(mockContext, times(1)).reconfigureVertex(eq(1), any
- (VertexLocationHint.class), anyMap());
- verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
- Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and
parallelism changed
- scheduledTasks.clear();
- // TODO TEZ-1714 locking verify(mockContext,
times(1)).vertexManagerDone(); // notified after scheduling all tasks
-
- // check scheduling only after onVertexStarted
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
- manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of
reconfig
- verify(mockContext, times(3)).vertexReconfigurationPlanned();
- // source vertices have 0 tasks. so only 1 notification needed. does not
trigger scheduling
- // normally this event will not come before onVertexStarted() is called
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- verify(mockContext, times(1)).doneReconfiguringVertex(); // no change.
will trigger after start
- Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
- // trigger start and processing of pending notification events
- manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.bipartiteSources == 2);
- verify(mockContext, times(2)).reconfigureVertex(eq(1), any
- (VertexLocationHint.class), anyMap());
- verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and
parallelism changed
-
-
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+ final VertexManagerPluginContext mockContext = createVertexManagerContext(
+ mockSrcVertexId1, 2, mockSrcVertexId2, 2, mockSrcVertexId3, 2,
+ mockManagedVertexId, 4, scheduledTasks, newEdgeManagers);
VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L,
mockSrcVertexId1);
// parallelism not change due to large data size
manager = createManager(conf, mockContext, 0.1f, 0.1f);
- verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez
notified of reconfig
+ verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez
notified of reconfig
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
manager.onVertexManagerEventReceived(vmEvent);
@@ -312,210 +81,26 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(0)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
- verify(mockContext, times(2)).doneReconfiguringVertex();
+ verify(mockContext, times(0)).doneReconfiguringVertex();
// trigger scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(0)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
- verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
+ verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());
// TODO TEZ-1714 locking verify(mockContext,
times(2)).vertexManagerDone(); // notified after scheduling all tasks
Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
-
- /**
- * Test vmEvent and vertexStatusUpdate before started
- */
- scheduledTasks.clear();
- //{5,9,12,18} in bitmap
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
-
- manager = createManager(conf, mockContext, 0.01f, 0.75f);
- Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- TezTaskAttemptID taId1 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId1));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertEquals(0, manager.numVertexManagerEventsReceived); // nothing
happens
- manager.onVertexStarted(emptyCompletions); // now the processing happens
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
-
- /**
- * Test partition stats
- */
- scheduledTasks.clear();
- //{5,9,12,18} in bitmap
- final long MB = 1024l * 1024l;
- long[] sizes = new long[]{(0l), (1 * MB), (964 * MB), (48 * MB)};
- vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", false);
-
- manager = createManager(conf, mockContext, 0.01f, 0.75f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- taId1 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId1));
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
-
- Assert.assertEquals(4, manager.stats.length);
- Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
- Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
- Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
- Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
-
- // sending again from a different version of the same task has not impact
- TezTaskAttemptID taId2 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId2));
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
-
- Assert.assertEquals(4, manager.stats.length);
- Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
- Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
- Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
- Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
-
- // Testing for detailed partition stats
- vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", true);
-
- manager = createManager(conf, mockContext, 0.01f, 0.75f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- taId1 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId1));
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
-
- Assert.assertEquals(4, manager.stats.length);
- Assert.assertEquals(0, manager.stats[0]);
- Assert.assertEquals(1, manager.stats[1]);
- Assert.assertEquals(964, manager.stats[2]);
- Assert.assertEquals(48, manager.stats[3]);
-
- // sending again from a different version of the same task has not impact
- taId2 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId2));
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
-
- Assert.assertEquals(4, manager.stats.length);
- Assert.assertEquals(0, manager.stats[0]);
- Assert.assertEquals(1, manager.stats[1]);
- Assert.assertEquals(964, manager.stats[2]);
- Assert.assertEquals(48, manager.stats[3]);
-
- /**
- * Test for TEZ-978
- * Delay determining parallelism until enough data has been received.
- */
- scheduledTasks.clear();
-
- //min/max fraction of 0.01/0.75 would ensure that we hit
determineParallelism code path on receiving first event itself.
- manager = createManager(conf, mockContext, 0.01f, 0.75f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- //First task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1);
- manager.onVertexManagerEventReceived(vmEvent); //small payload
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.determineParallelismAndApply(0f) == false);
- Assert.assertEquals(4, manager.pendingTasks.size());
- Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
- Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
-
- //First task in src2 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2);
- manager.onVertexManagerEventReceived(vmEvent); //small payload
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- //Still overall data gathered has not reached threshold; So, ensure
parallelism can be determined later
- Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false);
- Assert.assertEquals(4, manager.pendingTasks.size());
- Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
- Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
-
- //First task in src2 completed (with larger payload) to trigger
determining parallelism
- vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure
parallelism is determined
- verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- verify(mockContext, times(1)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertEquals(0, manager.pendingTasks.size());
- Assert.assertEquals(2, scheduledTasks.size());
- Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(3, manager.numVertexManagerEventsReceived);
- Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize);
-
- //Test for max fraction. Min fraction is just instruction to framework,
but honor max fraction
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
- scheduledTasks.clear();
-
- //min/max fraction of 0.0/0.2
- manager = createManager(conf, mockContext, 0.0f, 0.2f);
- // initial invocation count == 3
- verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertEquals(40, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- //send 7 events with payload size as 100
- for(int i=0;i<8;i++) {
- //small payload - create new event each time or it will be ignored (from
same task)
- manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L,
mockSrcVertexId1));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
- //should not change parallelism
- verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- }
- for(int i=0;i<3;i++) {
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i));
- verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- }
- //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks.
Setting this for test
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
- //Since max threshold (40 * 0.2 = 8) is met, vertex manager should
determine parallelism
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
- // parallelism updated
- verify(mockContext, times(4)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- // check exact update value - 8 events with 100 each => 20 -> 2000 => 2
tasks (with 1000 per task)
- verify(mockContext, times(2)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
-
- //reset context for next test
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
- // parallelism changed due to small data size
scheduledTasks.clear();
// Ensure long overflow doesn't reduce mistakenly
// Overflow can occur previously when output size * num tasks for a single
vertex would over flow max long
//
- manager = createManager(conf, mockContext, 1.0f, 1.0f,
(long)(Long.MAX_VALUE / 1.5));
+ manager = createManager(conf, mockContext, true, (long)(Long.MAX_VALUE /
1.5), 1.0f, 1.0f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
@@ -557,8 +142,8 @@ public class TestShuffleVertexManager {
manager.onVertexManagerEventReceived(vmEvent);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
// Auto-reduce is triggered
- verify(mockContext, times(5)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- verify(mockContext, times(3)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(1)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(1)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(2, scheduledTasks.size());
@@ -575,637 +160,91 @@ public class TestShuffleVertexManager {
// parallelism changed due to small data size
scheduledTasks.clear();
+ }
- manager = createManager(conf, mockContext, 0.5f, 0.5f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
- // task completion from non-bipartite stage does nothing
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
- Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1);
- manager.onVertexManagerEventReceived(vmEvent);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertEquals(4, manager.pendingTasks.size());
- Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
- Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
- // ignore duplicate completion
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertEquals(4, manager.pendingTasks.size());
- Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
- vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2);
- manager.onVertexManagerEventReceived(vmEvent);
- //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks.
Setting this for test
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+ @Test(timeout = 5000)
+ public void testAutoParallelismConfig() throws Exception {
+ ShuffleVertexManager manager;
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- // managedVertex tasks reduced
- verify(mockContext, times(6)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- verify(mockContext, times(4)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
- Assert.assertEquals(2, newEdgeManagers.size());
- // TODO improve tests for parallelism
- Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
- Assert.assertEquals(2, scheduledTasks.size());
- Assert.assertTrue(scheduledTasks.contains(new Integer(0)));
- Assert.assertTrue(scheduledTasks.contains(new Integer(1)));
- Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
- Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize);
-
- // more completions dont cause recalculation of parallelism
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- verify(mockContext, times(6)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
- Assert.assertEquals(2, newEdgeManagers.size());
-
- EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
- Map<Integer, List<Integer>> targets = Maps.newHashMap();
- DataMovementEvent dmEvent = DataMovementEvent.create(1,
ByteBuffer.wrap(new byte[0]));
- // 4 source task outputs - same as original number of partitions
- Assert.assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(0));
- // 4 destination task inputs - 2 source tasks + 2 merged partitions
- Assert.assertEquals(4, edgeManager.getNumDestinationTaskPhysicalInputs(0));
- edgeManager.routeDataMovementEventToDestination(dmEvent, 1,
dmEvent.getSourceIndex(), targets);
- Assert.assertEquals(1, targets.size());
- Map.Entry<Integer, List<Integer>> e = targets.entrySet().iterator().next();
- Assert.assertEquals(0, e.getKey().intValue());
- Assert.assertEquals(1, e.getValue().size());
- Assert.assertEquals(3, e.getValue().get(0).intValue());
- targets.clear();
- dmEvent = DataMovementEvent.create(2, ByteBuffer.wrap(new byte[0]));
- edgeManager.routeDataMovementEventToDestination(dmEvent, 0,
dmEvent.getSourceIndex(), targets);
- Assert.assertEquals(1, targets.size());
- e = targets.entrySet().iterator().next();
- Assert.assertEquals(1, e.getKey().intValue());
- Assert.assertEquals(1, e.getValue().size());
- Assert.assertEquals(0, e.getValue().get(0).intValue());
- targets.clear();
- edgeManager.routeInputSourceTaskFailedEventToDestination(2, targets);
- Assert.assertEquals(2, targets.size());
- for (Map.Entry<Integer, List<Integer>> entry : targets.entrySet()) {
- Assert.assertTrue(entry.getKey().intValue() == 0 ||
entry.getKey().intValue() == 1);
- Assert.assertEquals(2, entry.getValue().size());
- Assert.assertEquals(4, entry.getValue().get(0).intValue());
- Assert.assertEquals(5, entry.getValue().get(1).intValue());
- }
+ final List<Integer> scheduledTasks = Lists.newLinkedList();
+
+ final VertexManagerPluginContext mockContext = createVertexManagerContext(
+ "Vertex1", 2, "Vertex2", 2, "Vertex3", 2,
+ "Vertex4", 4, scheduledTasks, null);
+
+ //Check via setters
+ ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer =
ShuffleVertexManager
+ .createConfigBuilder(null);
+ VertexManagerPluginDescriptor pluginDesc =
configurer.setAutoReduceParallelism(true)
+ .setDesiredTaskInputSize(1000l)
+
.setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build();
+ when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+
+
+ manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
+ new Class[]{VertexManagerPluginContext.class}, new
Object[]{mockContext});
+ manager.initialize();
+ verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez
notified of reconfig
+
+ Assert.assertTrue(manager.config.isAutoParallelismEnabled());
+ Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == 1000l);
+ Assert.assertTrue(manager.mgrConfig.getMinTaskParallelism() == 10);
+ Assert.assertTrue(manager.config.getMinFraction() == 0.25f);
+ Assert.assertTrue(manager.config.getMaxFraction() == 0.5f);
+
+ configurer = ShuffleVertexManager.createConfigBuilder(null);
+ pluginDesc = configurer.setAutoReduceParallelism(false).build();
+ when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+
+ manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
+ new Class[]{VertexManagerPluginContext.class}, new
Object[]{mockContext});
+ manager.initialize();
+ verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez not
notified of reconfig
+
+ Assert.assertTrue(!manager.config.isAutoParallelismEnabled());
+ Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() ==
+
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
+ Assert.assertTrue(manager.mgrConfig.getMinTaskParallelism() == 1);
+ Assert.assertTrue(manager.config.getMinFraction() ==
+
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
+ Assert.assertTrue(manager.config.getMaxFraction() ==
+
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
}
-
+
@Test(timeout = 5000)
- public void testShuffleVertexManagerSlowStart() {
+ public void testSchedulingWithPartitionStats() throws IOException {
Configuration conf = new Configuration();
- ShuffleVertexManagerBase manager = null;
- HashMap<String, EdgeProperty> mockInputVertices =
- new HashMap<String, EdgeProperty>();
- String mockSrcVertexId1 = "Vertex1";
+ ShuffleVertexManagerBase manager;
+
+ HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String,
EdgeProperty>();
+ String r1 = "R1";
EdgeProperty eProp1 = EdgeProperty.create(
EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));
- String mockSrcVertexId2 = "Vertex2";
+ String m2 = "M2";
EdgeProperty eProp2 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));
- String mockSrcVertexId3 = "Vertex3";
+ String m3 = "M3";
EdgeProperty eProp3 = EdgeProperty.create(
EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));
-
- String mockManagedVertexId = "Vertex4";
-
- VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
-
when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class));
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
- when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
-
- // fail if there is no bipartite src vertex
- mockInputVertices.put(mockSrcVertexId3, eProp3);
- try {
- manager = createManager(conf, mockContext, 0.1f, 0.1f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertFalse(true);
- } catch (TezUncheckedException e) {
- Assert.assertTrue(e.getMessage().contains(
- "Atleast 1 bipartite source should exist"));
- }
-
- mockInputVertices.put(mockSrcVertexId1, eProp1);
- mockInputVertices.put(mockSrcVertexId2, eProp2);
-
- // check initialization
- manager = createManager(conf, mockContext, 0.1f, 0.1f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.bipartiteSources == 2);
-
- final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- scheduledTasks.clear();
- List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
- for (ScheduleTaskRequest task : tasks) {
- scheduledTasks.add(task.getTaskIndex());
- }
- return null;
- }}).when(mockContext).scheduleTasks(anyList());
-
- // source vertices have 0 tasks. immediate start of all managed tasks
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
-
- try {
- // source vertex have some tasks. min < 0.
- manager = createManager(conf, mockContext, -0.1f, 0.0f);
- Assert.assertTrue(false); // should not come here
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage().contains(
- "Invalid values for slowStartMinFraction"));
- }
-
- try {
- // source vertex have some tasks. max > 1.
- manager = createManager(conf, mockContext, 0.0f, 95.0f);
- Assert.assertTrue(false); // should not come here
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage().contains(
- "Invalid values for slowStartMinFraction"));
- }
-
- try {
- // source vertex have some tasks. min > max
- manager = createManager(conf, mockContext, 0.5f, 0.3f);
- Assert.assertTrue(false); // should not come here
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage().contains(
- "Invalid values for slowStartMinFraction"));
- }
-
- // source vertex have some tasks. min > default and max undefined
- int numTasks = 20;
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
- scheduledTasks.clear();
-
- manager = createManager(conf, mockContext, 0.8f, null);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
-
- Assert.assertEquals(3, manager.pendingTasks.size());
- Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- float completedTasksThreshold = 0.8f * numTasks;
- // Finish all tasks before exceeding the threshold
- for (String mockSrcVertex : new String[] { mockSrcVertexId1,
mockSrcVertexId2 }) {
- for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
- // complete 0th tasks outside the loop
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1));
- if ((i + 2) >= completedTasksThreshold) {
- // stop before completing more than min/max source tasks
- break;
- }
- }
- }
- // Since we haven't exceeded the threshold, all tasks are still pending
- Assert.assertEquals(manager.totalTasksToSchedule,
manager.pendingTasks.size());
- Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
-
- // Cross the threshold min/max threshold to schedule all tasks
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertEquals(3, manager.pendingTasks.size());
- Assert.assertEquals(0, scheduledTasks.size());
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertEquals(0, manager.pendingTasks.size());
- Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size());
// all tasks scheduled
-
- // reset vertices for next test
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
-
- // source vertex have some tasks. min, max == 0
- manager = createManager(conf, mockContext, 0.0f, 0.0f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.totalTasksToSchedule == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
- // all source vertices need to be configured
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-
- // min, max > 0 and min == max
- manager = createManager(conf, mockContext, 0.25f, 0.25f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- // task completion from non-bipartite stage does nothing
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
- // task completion on only 1 SG edge does nothing
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-
- // min, max > 0 and min == max == absolute max 1.0
- manager = createManager(conf, mockContext, 1.0f, 1.0f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- // task completion from non-bipartite stage does nothing
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
-
- // min, max > 0 and min == max
- manager = createManager(conf, mockContext, 1.0f, 1.0f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- // task completion from non-bipartite stage does nothing
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
-
- // reset vertices for next test
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
-
- // min, max > and min < max
- manager = createManager(conf, mockContext, 0.25f, 0.75f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- // completion of same task again should not get counted
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3);
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 1);
- Assert.assertTrue(scheduledTasks.size() == 2); // 2 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
- Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
- scheduledTasks.clear();
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1,
3)); // we are done. no action
- Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
-
- // min, max > and min < max
- manager = createManager(conf, mockContext, 0.25f, 1.0f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
- Assert.assertTrue(manager.pendingTasks.size() == 1);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3));
- Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
-
- // if there is single task to schedule, it should be schedule when src
completed
- // fraction is more than min slow start fraction
- scheduledTasks.clear();
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1);
- manager = createManager(conf, mockContext, 0.25f, 0.75f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 1); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 1);
- Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
- scheduledTasks.clear();
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
- Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
- scheduledTasks.clear();
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1,
3)); // we are done. no action
- Assert.assertTrue(manager.pendingTasks.size() == 0);
- Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
- }
-
-
- /**
- * Tasks should be scheduled only when all source vertices are configured
completely
- * @throws IOException
- */
- @Test(timeout = 5000)
- public void test_Tez1649_with_scatter_gather_edges() throws IOException {
- Configuration conf = new Configuration();
- ShuffleVertexManagerBase manager = null;
-
- HashMap<String, EdgeProperty> mockInputVertices_R2 = new HashMap<String,
EdgeProperty>();
- String r1 = "R1";
- EdgeProperty eProp1 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String m2 = "M2";
- EdgeProperty eProp2 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String m3 = "M3";
- EdgeProperty eProp3 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
-
- final String mockManagedVertexId_R2 = "R2";
- mockInputVertices_R2.put(r1, eProp1);
- mockInputVertices_R2.put(m2, eProp2);
- mockInputVertices_R2.put(m3, eProp3);
-
- final VertexManagerPluginContext mockContext_R2 =
mock(VertexManagerPluginContext.class);
-
when(mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
- when(mockContext_R2.getVertexName()).thenReturn(mockManagedVertexId_R2);
-
when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(3);
- when(mockContext_R2.getVertexNumTasks(r1)).thenReturn(3);
- when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
- when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
-
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
- // check initialization
- manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
-
- final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- scheduledTasks.clear();
- List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
- for (ScheduleTaskRequest task : tasks) {
- scheduledTasks.add(task.getTaskIndex());
- }
- return null;
- }}).when(mockContext_R2).scheduleTasks(anyList());
-
- manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.bipartiteSources == 3);
- manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
-
- manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
-
- //Send events for all tasks of m3.
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
-
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
-
- //Send events for m2. But still we need to wait for at least 1 event from
r1.
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
-
- // we need to wait for at least 1 event from r1 to make sure all vertices
cross min threshold
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
-
- //Ensure that setVertexParallelism is not called for R2.
- verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class),
- anyMap());
-
- //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks.
Setting this for test
-
when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1);
-
- // complete configuration of r1 triggers the scheduling
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
- verify(mockContext_R2, times(1)).reconfigureVertex(eq(1),
any(VertexLocationHint.class),
- anyMap());
-
- Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 1);
-
- //try with zero task vertices
- scheduledTasks.clear();
-
when(mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
- when(mockContext_R2.getVertexName()).thenReturn(mockManagedVertexId_R2);
-
when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(3);
- when(mockContext_R2.getVertexNumTasks(r1)).thenReturn(0);
- when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(0);
- when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
-
- manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- // Only need completed configuration notification from m3
- manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
- Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
- }
-
- VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize,
String vertexName) throws IOException {
- return getVertexManagerEvent(sizes, totalSize, vertexName, false);
- }
-
- VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize,
String vertexName, boolean reportDetailedStats)
- throws IOException {
- ByteBuffer payload = null;
- if (sizes != null) {
- RoaringBitmap partitionStats =
ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
- DataOutputBuffer dout = new DataOutputBuffer();
- partitionStats.serialize(dout);
- ByteString
- partitionStatsBytes =
TezCommonUtils.compressByteArrayToByteString(dout.getData());
- if (reportDetailedStats) {
- payload =
- VertexManagerEventPayloadProto.newBuilder()
- .setOutputSize(totalSize)
-
.setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizes))
- .build().toByteString()
- .asReadOnlyByteBuffer();
- } else {
- payload =
- VertexManagerEventPayloadProto.newBuilder()
- .setOutputSize(totalSize)
- .setPartitionStats(partitionStatsBytes)
- .build().toByteString()
- .asReadOnlyByteBuffer();
- }
-
- } else {
- payload =
- VertexManagerEventPayloadProto.newBuilder()
- .setOutputSize(totalSize)
- .build().toByteString()
- .asReadOnlyByteBuffer();
- }
- TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag",
vertexName,
- TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId,
taskId++), 0));
- VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName,
payload);
- vmEvent.setProducerAttemptIdentifier(taId);
- return vmEvent;
- }
-
- @Test(timeout = 5000)
- public void testSchedulingWithPartitionStats() throws IOException {
- Configuration conf = new Configuration();
- ShuffleVertexManagerBase manager = null;
-
- HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String,
EdgeProperty>();
- String r1 = "R1";
- EdgeProperty eProp1 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String m2 = "M2";
- EdgeProperty eProp2 = EdgeProperty.create(
- EdgeProperty.DataMovementType.BROADCAST,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String m3 = "M3";
- EdgeProperty eProp3 = EdgeProperty.create(
- EdgeProperty.DataMovementType.BROADCAST,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
-
- final String mockManagedVertexId = "R2";
-
- mockInputVertices.put(r1, eProp1);
- mockInputVertices.put(m2, eProp2);
- mockInputVertices.put(m3, eProp3);
-
+
+ final String mockManagedVertexId = "R2";
+
+ mockInputVertices.put(r1, eProp1);
+ mockInputVertices.put(m2, eProp2);
+ mockInputVertices.put(m3, eProp3);
+
VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
@@ -1215,16 +254,8 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
final List<Integer> scheduledTasks = Lists.newLinkedList();
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- scheduledTasks.clear();
- List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
- for (ScheduleTaskRequest task : tasks) {
- scheduledTasks.add(task.getTaskIndex());
- }
- return null;
- }}).when(mockContext).scheduleTasks(anyList());
+ doAnswer(new ScheduledTasksAnswer(scheduledTasks)).when(
+ mockContext).scheduleTasks(anyList());
// check initialization
manager = createManager(conf, mockContext, 0.001f, 0.001f);
@@ -1270,283 +301,18 @@ public class TestShuffleVertexManager {
Assert.assertTrue(scheduledTasks.get(2) == 1);
}
- @Test(timeout = 5000)
- public void test_Tez1649_with_mixed_edges() {
- Configuration conf = new Configuration();
- ShuffleVertexManagerBase manager = null;
-
- HashMap<String, EdgeProperty> mockInputVertices =
- new HashMap<String, EdgeProperty>();
- String r1 = "R1";
- EdgeProperty eProp1 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String m2 = "M2";
- EdgeProperty eProp2 = EdgeProperty.create(
- EdgeProperty.DataMovementType.BROADCAST,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
- String m3 = "M3";
- EdgeProperty eProp3 = EdgeProperty.create(
- EdgeProperty.DataMovementType.BROADCAST,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
-
- final String mockManagedVertexId = "R2";
-
- mockInputVertices.put(r1, eProp1);
- mockInputVertices.put(m2, eProp2);
- mockInputVertices.put(m3, eProp3);
-
- VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
- when(mockContext.getVertexNumTasks(r1)).thenReturn(3);
- when(mockContext.getVertexNumTasks(m2)).thenReturn(3);
- when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
-
- final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- scheduledTasks.clear();
- List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
- for (ScheduleTaskRequest task : tasks) {
- scheduledTasks.add(task.getTaskIndex());
- }
- return null;
- }}).when(mockContext).scheduleTasks(anyList());
-
- // check initialization
- manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.bipartiteSources == 1);
-
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
-
- Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- //Send events for 2 tasks of r1.
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
- //Send an event for m2.
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
- //Send an event for m3.
- manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
-
- //Scenario when numBipartiteSourceTasksCompleted ==
totalNumBipartiteSourceTasks.
- //Still, wait for a configuration to be completed from other edges
- scheduledTasks.clear();
- manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
-
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
- when(mockContext.getVertexNumTasks(r1)).thenReturn(3);
- when(mockContext.getVertexNumTasks(m2)).thenReturn(3);
- when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2));
- //Tasks from non-scatter edges of m2 and m3 are not complete.
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
- //Got an event from other edges. Schedule all
- Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
-
-
- //try with a zero task vertex (with non-scatter-gather edges)
- scheduledTasks.clear();
- manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(emptyCompletions);
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
- when(mockContext.getVertexNumTasks(r1)).thenReturn(3); //scatter gather
- when(mockContext.getVertexNumTasks(m2)).thenReturn(0); //broadcast
- when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast
-
- manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
-
- Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- //Send 2 events for tasks of r1.
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 0);
-
- // event from m3 triggers scheduling
- manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
- Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
-
- //try with all zero task vertices in non-SG edges
- scheduledTasks.clear();
- manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(emptyCompletions);
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
- when(mockContext.getVertexNumTasks(r1)).thenReturn(3); //scatter gather
- when(mockContext.getVertexNumTasks(m2)).thenReturn(0); //broadcast
- when(mockContext.getVertexNumTasks(m3)).thenReturn(0); //broadcast
-
- //Send 1 events for tasks of r1.
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
- Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
- }
-
- @Test
- public void testZeroTasksSendsConfigured() throws IOException {
- Configuration conf = new Configuration();
- ShuffleVertexManagerBase manager = null;
- HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String,
EdgeProperty>();
- String r1 = "R1";
- EdgeProperty eProp1 = EdgeProperty.create(
- EdgeProperty.DataMovementType.SCATTER_GATHER,
- EdgeProperty.DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- OutputDescriptor.create("out"),
- InputDescriptor.create("in"));
-
- final String mockManagedVertexId = "R2";
- mockInputVertices.put(r1, eProp1);
-
- final VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
-
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
- when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);
-
- // check initialization
- manager = createManager(conf, mockContext, 0.001f, 0.001f);
-
- final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
- doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- scheduledTasks.clear();
- List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
- for (ScheduleTaskRequest task : tasks) {
- scheduledTasks.add(task.getTaskIndex());
- }
- return null;
- }}).when(mockContext).scheduleTasks(anyList());
-
- manager.onVertexStarted(emptyCompletions);
- manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
- Assert.assertEquals(1, manager.bipartiteSources);
- Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks);
- Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(0, scheduledTasks.size());
- verify(mockContext).doneReconfiguringVertex();
- }
-
- public static TaskAttemptIdentifier createTaskAttemptIdentifier(String
vName, int tId) {
- VertexIdentifier mockVertex = mock(VertexIdentifier.class);
- when(mockVertex.getName()).thenReturn(vName);
- TaskIdentifier mockTask = mock(TaskIdentifier.class);
- when(mockTask.getIdentifier()).thenReturn(tId);
- when(mockTask.getVertexIdentifier()).thenReturn(mockVertex);
- TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class);
- when(mockAttempt.getIdentifier()).thenReturn(0);
- when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask);
- return mockAttempt;
+ private static ShuffleVertexManager createManager(Configuration conf,
+ VertexManagerPluginContext context, Float min, Float max) {
+ return createManager(conf, context, true, 1000l, min, max);
}
- private ShuffleVertexManagerBase createManager(Configuration conf,
- VertexManagerPluginContext
context, Float min, Float max, Long size) {
- if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) {
- return createShuffleVertexManager(conf, context, min, max, size);
- } else {
- return null;
- }
- }
-
- private ShuffleVertexManagerBase createManager(Configuration conf,
- VertexManagerPluginContext
context, Float min, Float max) {
- if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) {
- return createShuffleVertexManager(conf, context, min, max, null);
- } else {
- return null;
- }
- }
-
- private ShuffleVertexManager createShuffleVertexManager(Configuration conf,
- VertexManagerPluginContext context, Float min, Float max, Long size) {
- if (min != null) {
- conf.setFloat(
- ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
- min);
- } else {
- conf.unset(
- ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
- }
- if (max != null) {
- conf.setFloat(
- ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
- max);
- } else {
-
conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION);
- }
- conf.setBoolean(
- ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
- true);
- if (size != null) {
- conf.setLong(
-
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- size);
- } else {
- conf.setLong(
-
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- 1000L);
- }
- UserPayload payload;
- try {
- payload = TezUtils.createUserPayloadFromConf(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- when(context.getUserPayload()).thenReturn(payload);
- ShuffleVertexManager manager = new ShuffleVertexManager(context);
- manager.initialize();
- return manager;
+ private static ShuffleVertexManager createManager(Configuration conf,
+ VertexManagerPluginContext context,
+ Boolean enableAutoParallelism, Long desiredTaskInputSize, Float min,
+ Float max) {
+ return (ShuffleVertexManager)TestShuffleVertexManagerBase.createManager(
+ ShuffleVertexManager.class, conf, context,
+ enableAutoParallelism, desiredTaskInputSize, min, max);
}
}