Repository: tez Updated Branches: refs/heads/master 1468457e6 -> 9ca2d5be6
http://git-wip-us.apache.org/repos/asf/tez/blob/9ca2d5be/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index a5a6581..128d6fa 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -56,12 +56,16 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.roaringbitmap.RoaringBitmap; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -80,21 +84,32 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @SuppressWarnings({ "unchecked", "rawtypes" }) +@RunWith(Parameterized.class) public class TestShuffleVertexManager { TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); int taskId = 0; List<TaskAttemptIdentifier> emptyCompletions = null; + Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass; + + @SuppressWarnings("deprecation") + @Parameterized.Parameters(name = "test[{0}]") + public static Collection<Object[]> data() { + Object[][] data = new Object[][]{ + {ShuffleVertexManager.class}}; + return Arrays.asList(data); + } + + public TestShuffleVertexManager( + Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass) { + this.shuffleVertexManagerClass = shuffleVertexManagerClass; + } @Test(timeout = 5000) public void testShuffleVertexManagerAutoParallelism() throws Exception { Configuration conf = new Configuration(); - conf.setBoolean( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, - true); - conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); - ShuffleVertexManager manager = null; - + ShuffleVertexManagerBase manager = null; + HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); String mockSrcVertexId1 = "Vertex1"; @@ -144,11 +159,14 @@ public class TestShuffleVertexManager { manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez notified of reconfig - Assert.assertTrue(manager.enableAutoParallelism == true); - Assert.assertTrue(manager.desiredTaskInputDataSize == 1000l); - Assert.assertTrue(manager.minTaskParallelism == 10); - Assert.assertTrue(manager.slowStartMinSrcCompletionFraction == 0.25f); - Assert.assertTrue(manager.slowStartMaxSrcCompletionFraction == 0.5f); + Assert.assertTrue(manager.config.isAutoParallelismEnabled() == true); + Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == 1000l); + if (manager instanceof ShuffleVertexManager) { + Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig. + getMinTaskParallelism() == 10); + } + Assert.assertTrue(manager.config.getMinFraction() == 0.25f); + Assert.assertTrue(manager.config.getMaxFraction() == 0.5f); configurer = ShuffleVertexManager.createConfigBuilder(null); pluginDesc = configurer.setAutoReduceParallelism(false).build(); @@ -159,13 +177,16 @@ public class TestShuffleVertexManager { manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez not notified of reconfig - Assert.assertTrue(manager.enableAutoParallelism == false); - Assert.assertTrue(manager.desiredTaskInputDataSize == + Assert.assertTrue(manager.config.isAutoParallelismEnabled() == false); + Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT); - Assert.assertTrue(manager.minTaskParallelism == 1); - Assert.assertTrue(manager.slowStartMinSrcCompletionFraction == + if (manager instanceof ShuffleVertexManager) { + Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig. + getMinTaskParallelism() == 1); + } + Assert.assertTrue(manager.config.getMinFraction() == ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT); - Assert.assertTrue(manager.slowStartMaxSrcCompletionFraction == + Assert.assertTrue(manager.config.getMaxFraction() == ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT); @@ -576,7 +597,7 @@ public class TestShuffleVertexManager { @Test(timeout = 5000) public void testShuffleVertexManagerSlowStart() { Configuration conf = new Configuration(); - ShuffleVertexManager manager = null; + ShuffleVertexManagerBase manager = null; HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); String mockSrcVertexId1 = "Vertex1"; @@ -660,7 +681,7 @@ public class TestShuffleVertexManager { Assert.assertTrue(false); // should not come here } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().contains( - "Invalid values for slowStartMinSrcCompletionFraction")); + "Invalid values for slowStartMinFraction")); } try { @@ -669,7 +690,7 @@ public class TestShuffleVertexManager { Assert.assertTrue(false); // should not come here } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().contains( - "Invalid values for slowStartMinSrcCompletionFraction")); + "Invalid values for slowStartMinFraction")); } try { @@ -678,7 +699,7 @@ public class TestShuffleVertexManager { Assert.assertTrue(false); // should not come here } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().contains( - "Invalid values for slowStartMinSrcCompletionFraction")); + "Invalid values for slowStartMinFraction")); } // source vertex have some tasks. min > default and max undefined @@ -918,11 +939,7 @@ public class TestShuffleVertexManager { @Test(timeout = 5000) public void test_Tez1649_with_scatter_gather_edges() throws IOException { Configuration conf = new Configuration(); - conf.setBoolean( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, - true); - conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); - ShuffleVertexManager manager = null; + ShuffleVertexManagerBase manager = null; HashMap<String, EdgeProperty> mockInputVertices_R2 = new HashMap<String, EdgeProperty>(); String r1 = "R1"; @@ -1094,11 +1111,7 @@ public class TestShuffleVertexManager { @Test(timeout = 5000) public void testSchedulingWithPartitionStats() throws IOException { Configuration conf = new Configuration(); - conf.setBoolean( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, - true); - conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); - ShuffleVertexManager manager = null; + ShuffleVertexManagerBase manager = null; HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); String r1 = "R1"; @@ -1196,11 +1209,7 @@ public class TestShuffleVertexManager { @Test(timeout = 5000) public void test_Tez1649_with_mixed_edges() { Configuration conf = new Configuration(); - conf.setBoolean( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, - true); - conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); - ShuffleVertexManager manager = null; + ShuffleVertexManagerBase manager = null; HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); @@ -1362,11 +1371,7 @@ public class TestShuffleVertexManager { @Test public void testZeroTasksSendsConfigured() throws IOException { Configuration conf = new Configuration(); - conf.setBoolean( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, - true); - conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); - ShuffleVertexManager manager = null; + ShuffleVertexManagerBase manager = null; HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); String r1 = "R1"; @@ -1385,7 +1390,6 @@ public class TestShuffleVertexManager { when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0); - VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1); // check initialization manager = createManager(conf, mockContext, 0.001f, 0.001f); @@ -1423,18 +1427,38 @@ public class TestShuffleVertexManager { return mockAttempt; } - private ShuffleVertexManager createManager(Configuration conf, + private ShuffleVertexManagerBase createManager(Configuration conf, + VertexManagerPluginContext context, Float min, Float max) { + if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) { + return createShuffleVertexManager(conf, context, min, max); + } else { + return null; + } + } + + private ShuffleVertexManager createShuffleVertexManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) { if (min != null) { - conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); + conf.setFloat( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, + min); } else { - conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); + conf.unset( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); } if (max != null) { - conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); + conf.setFloat( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, + max); } else { conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION); } + conf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + true); + conf.setLong( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + 1000L); UserPayload payload; try { payload = TezUtils.createUserPayloadFromConf(conf);
