Repository: tez
Updated Branches:
  refs/heads/master 1468457e6 -> 9ca2d5be6


http://git-wip-us.apache.org/repos/asf/tez/blob/9ca2d5be/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index a5a6581..128d6fa 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -56,12 +56,16 @@ import 
org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.roaringbitmap.RoaringBitmap;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -80,21 +84,32 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
+@RunWith(Parameterized.class)
 public class TestShuffleVertexManager {
 
   TezVertexID vertexId = 
TezVertexID.fromString("vertex_1436907267600_195589_1_00");
   int taskId = 0;
   List<TaskAttemptIdentifier> emptyCompletions = null;
+  Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass;
+
+  @SuppressWarnings("deprecation")
+  @Parameterized.Parameters(name = "test[{0}]")
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][]{
+        {ShuffleVertexManager.class}};
+    return Arrays.asList(data);
+  }
+
+  public TestShuffleVertexManager(
+      Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass) {
+    this.shuffleVertexManagerClass = shuffleVertexManagerClass;
+  }
 
   @Test(timeout = 5000)
   public void testShuffleVertexManagerAutoParallelism() throws Exception {
     Configuration conf = new Configuration();
-    conf.setBoolean(
-        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
-        true);
-    
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 1000L);
-    ShuffleVertexManager manager = null;
-    
+    ShuffleVertexManagerBase manager = null;
+
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -144,11 +159,14 @@ public class TestShuffleVertexManager {
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez 
notified of reconfig
 
-    Assert.assertTrue(manager.enableAutoParallelism == true);
-    Assert.assertTrue(manager.desiredTaskInputDataSize == 1000l);
-    Assert.assertTrue(manager.minTaskParallelism == 10);
-    Assert.assertTrue(manager.slowStartMinSrcCompletionFraction == 0.25f);
-    Assert.assertTrue(manager.slowStartMaxSrcCompletionFraction == 0.5f);
+    Assert.assertTrue(manager.config.isAutoParallelismEnabled() == true);
+    Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == 1000l);
+    if (manager instanceof ShuffleVertexManager) {
+      Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig.
+          getMinTaskParallelism() == 10);
+    }
+    Assert.assertTrue(manager.config.getMinFraction() == 0.25f);
+    Assert.assertTrue(manager.config.getMaxFraction() == 0.5f);
 
     configurer = ShuffleVertexManager.createConfigBuilder(null);
     pluginDesc = configurer.setAutoReduceParallelism(false).build();
@@ -159,13 +177,16 @@ public class TestShuffleVertexManager {
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez not 
notified of reconfig
 
-    Assert.assertTrue(manager.enableAutoParallelism == false);
-    Assert.assertTrue(manager.desiredTaskInputDataSize ==
+    Assert.assertTrue(manager.config.isAutoParallelismEnabled() == false);
+    Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() ==
         
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
-    Assert.assertTrue(manager.minTaskParallelism == 1);
-    Assert.assertTrue(manager.slowStartMinSrcCompletionFraction ==
+    if (manager instanceof ShuffleVertexManager) {
+      Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig.
+          getMinTaskParallelism() == 1);
+    }
+    Assert.assertTrue(manager.config.getMinFraction() ==
         
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
-    Assert.assertTrue(manager.slowStartMaxSrcCompletionFraction ==
+    Assert.assertTrue(manager.config.getMaxFraction() ==
         
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
 
 
@@ -576,7 +597,7 @@ public class TestShuffleVertexManager {
   @Test(timeout = 5000)
   public void testShuffleVertexManagerSlowStart() {
     Configuration conf = new Configuration();
-    ShuffleVertexManager manager = null;
+    ShuffleVertexManagerBase manager = null;
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -660,7 +681,7 @@ public class TestShuffleVertexManager {
       Assert.assertTrue(false); // should not come here
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().contains(
-          "Invalid values for slowStartMinSrcCompletionFraction"));
+          "Invalid values for slowStartMinFraction"));
     }
 
     try {
@@ -669,7 +690,7 @@ public class TestShuffleVertexManager {
       Assert.assertTrue(false); // should not come here
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().contains(
-          "Invalid values for slowStartMinSrcCompletionFraction"));
+          "Invalid values for slowStartMinFraction"));
     }
 
     try {
@@ -678,7 +699,7 @@ public class TestShuffleVertexManager {
       Assert.assertTrue(false); // should not come here
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().contains(
-          "Invalid values for slowStartMinSrcCompletionFraction"));
+          "Invalid values for slowStartMinFraction"));
     }
 
     // source vertex have some tasks. min > default and max undefined
@@ -918,11 +939,7 @@ public class TestShuffleVertexManager {
   @Test(timeout = 5000)
   public void test_Tez1649_with_scatter_gather_edges() throws IOException {
     Configuration conf = new Configuration();
-    conf.setBoolean(
-        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
-        true);
-    
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 1000L);
-    ShuffleVertexManager manager = null;
+    ShuffleVertexManagerBase manager = null;
 
     HashMap<String, EdgeProperty> mockInputVertices_R2 = new HashMap<String, 
EdgeProperty>();
     String r1 = "R1";
@@ -1094,11 +1111,7 @@ public class TestShuffleVertexManager {
   @Test(timeout = 5000)
   public void testSchedulingWithPartitionStats() throws IOException {
     Configuration conf = new Configuration();
-    conf.setBoolean(
-        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
-        true);
-    
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 1000L);
-    ShuffleVertexManager manager = null;
+    ShuffleVertexManagerBase manager = null;
 
     HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, 
EdgeProperty>();
     String r1 = "R1";
@@ -1196,11 +1209,7 @@ public class TestShuffleVertexManager {
   @Test(timeout = 5000)
   public void test_Tez1649_with_mixed_edges() {
     Configuration conf = new Configuration();
-    conf.setBoolean(
-        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
-        true);
-    
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 1000L);
-    ShuffleVertexManager manager = null;
+    ShuffleVertexManagerBase manager = null;
 
     HashMap<String, EdgeProperty> mockInputVertices =
         new HashMap<String, EdgeProperty>();
@@ -1362,11 +1371,7 @@ public class TestShuffleVertexManager {
   @Test
   public void testZeroTasksSendsConfigured() throws IOException {
     Configuration conf = new Configuration();
-    conf.setBoolean(
-        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
-        true);
-    
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 1000L);
-    ShuffleVertexManager manager = null;
+    ShuffleVertexManagerBase manager = null;
 
     HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, 
EdgeProperty>();
     String r1 = "R1";
@@ -1385,7 +1390,6 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);
 
-    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
     // check initialization
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
 
@@ -1423,18 +1427,38 @@ public class TestShuffleVertexManager {
     return mockAttempt;
   }
 
-  private ShuffleVertexManager createManager(Configuration conf,
+  private ShuffleVertexManagerBase createManager(Configuration conf,
+      VertexManagerPluginContext context, Float min, Float max) {
+    if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) {
+      return createShuffleVertexManager(conf, context, min, max);
+    } else {
+      return null;
+    }
+  }
+
+  private ShuffleVertexManager createShuffleVertexManager(Configuration conf,
       VertexManagerPluginContext context, Float min, Float max) {
     if (min != null) {
-      
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 
min);
+      conf.setFloat(
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
+              min);
     } else {
-      
conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+      conf.unset(
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
     }
     if (max != null) {
-      
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 
max);
+      conf.setFloat(
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
+              max);
     } else {
       
conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION);
     }
+    conf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+            true);
+    conf.setLong(
+        
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+            1000L);
     UserPayload payload;
     try {
       payload = TezUtils.createUserPayloadFromConf(conf);

Reply via email to