This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 134ecc3  TEZ-4206: TestSpeculation.testBasicSpeculationPerVertexConf 
is flaky (Mustafa Iman via Ashutosh Chauhan)
134ecc3 is described below

commit 134ecc3c73098a22d34417ab69d03b69068b1354
Author: Mustafa Iman <[email protected]>
AuthorDate: Thu Jul 30 12:08:18 2020 +0200

    TEZ-4206: TestSpeculation.testBasicSpeculationPerVertexConf is flaky 
(Mustafa Iman via Ashutosh Chauhan)
    
    Signed-off-by: Laszlo Bodor <[email protected]>
---
 .../dag/speculation/legacy/LegacySpeculator.java   | 23 ++++--------
 .../org/apache/tez/dag/app/TestSpeculation.java    | 41 +++++++++++++---------
 2 files changed, 31 insertions(+), 33 deletions(-)

diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index f21b819..6bf02df 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -23,13 +23,10 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.service.AbstractService;
@@ -101,8 +98,6 @@ public class LegacySpeculator extends AbstractService {
   private final Clock clock;
   private Thread speculationBackgroundThread = null;
   private volatile boolean stopped = false;
-  /** Allow the speculator to wait on a blockingQueue in case we use it for 
event notification. */
-  private BlockingQueue<Object> scanControl = new 
LinkedBlockingQueue<Object>();
 
   @VisibleForTesting
   public int getMinimumAllowedSpeculativeTasks() { return 
minimumAllowedSpeculativeTasks;}
@@ -233,16 +228,6 @@ public class LegacySpeculator extends AbstractService {
     }
   }
 
-  // This interface is intended to be used only for test cases.
-  public void scanForSpeculationsForTesting() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("We got asked to run a debug speculation scan.");
-      LOG.debug("There are {} speculative events stacked already.", 
scanControl.size());
-    }
-    scanControl.add(new Object());
-    Thread.yield();
-  }
-
   public Runnable createThread() {
     return new Runnable() {
       @Override
@@ -256,9 +241,13 @@ public class LegacySpeculator extends AbstractService {
             long wait = Math.max(nextRecompTime, clock.getTime() - 
backgroundRunStartTime);
             if (speculations > 0) {
               LOG.info("We launched " + speculations
-                  + " speculations.  Waiting " + wait + " milliseconds.");
+                  + " speculations.  Waiting " + wait + " milliseconds before 
next evaluation.");
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Waiting {} milliseconds before next evaluation.", 
wait);
+              }
             }
-            Object pollResult = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+            Thread.sleep(wait);
           } catch (InterruptedException ie) {
             if (!stopped) {
               LOG.warn("Speculator thread interrupted", ie);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index b9a7c5a..b6f5030 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -84,6 +84,16 @@ public class TestSpeculation {
       "Number of attempts after Speculation should be two";
   private static final String UNIT_EXCEPTION_MESSAGE =
       "test timed out after";
+
+  /**
+   * {@link MockDAGAppMaster#launcherSleepTime} advances tasks every 1 
millisecond.
+   * We want our test task to take at least slightly more than 1 second. This 
is because
+   * MockDAGAppMaster's mock clock advances clock 1 second at each tick. If we 
are unlucky
+   * this may cause speculator to wait 1 second between each evaluation. If we 
are really
+   * unlucky, our test tasks finish before speculator has a chance to evaluate 
and speculate
+   * them. That is why we want the tasks to take at least one second.
+   */
+  private static final int NUM_UPDATES_FOR_TEST_TASK = 1200;
   private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
   private Configuration defaultConf;
   private FileSystem localFs;
@@ -277,7 +287,7 @@ public class TestSpeculation {
    * @throws Exception the exception
    */
   @Retry
-  @Test (timeout = 10000)
+  @Test (timeout = 30000)
   public void testSingleTaskSpeculation() throws Exception {
     // Map<Timeout conf value, expected number of tasks>
     Map<Long, Integer> confToExpected = new HashMap<Long, Integer>();
@@ -308,7 +318,7 @@ public class TestSpeculation {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
       Thread.sleep(200);
       // cause speculation trigger
-      mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+      mockLauncher.setStatusUpdatesForTask(killedTaId, 
NUM_UPDATES_FOR_TEST_TASK);
 
       mockLauncher.startScheduling(true);
       dagClient.waitForCompletion();
@@ -350,7 +360,7 @@ public class TestSpeculation {
 
     mockLauncher.updateProgress(withProgress);
     // cause speculation trigger
-    mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+    mockLauncher.setStatusUpdatesForTask(killedTaId, 
NUM_UPDATES_FOR_TEST_TASK);
 
     mockLauncher.startScheduling(true);
     dagClient.waitForCompletion();
@@ -392,7 +402,7 @@ public class TestSpeculation {
    * @throws Exception the exception
    */
   @Retry
-  @Test (timeout=10000)
+  @Test (timeout=30000)
   public void testBasicSpeculationWithProgress() throws Exception {
     testBasicSpeculation(true);
   }
@@ -403,7 +413,7 @@ public class TestSpeculation {
    * @throws Exception the exception
    */
   @Retry
-  @Test (timeout=10000)
+  @Test (timeout=30000)
   public void testBasicSpeculationWithoutProgress() throws Exception {
     testBasicSpeculation(false);
   }
@@ -414,12 +424,11 @@ public class TestSpeculation {
    * @throws Exception the exception
    */
   @Retry
-  @Test (timeout=10000)
+  @Test (timeout=30000)
   public void testBasicSpeculationPerVertexConf() throws Exception {
     DAG dag = DAG.create("test");
     String vNameNoSpec = "A";
     String vNameSpec = "B";
-    String speculatorSleepTime = "50";
     Vertex vA = Vertex.create(vNameNoSpec, 
ProcessorDescriptor.create("Proc.class"), 5);
     Vertex vB = Vertex.create(vNameSpec, 
ProcessorDescriptor.create("Proc.class"), 5);
     vA.setConf(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, "false");
@@ -436,20 +445,20 @@ public class TestSpeculation {
 
     DAGClient dagClient = tezClient.submitDAG(dag);
     DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
-    TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId();
+    TezVertexID vertexIdSpec = dagImpl.getVertex(vNameSpec).getVertexId();
     TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
     // original attempt is killed and speculative one is successful
     TezTaskAttemptID killedTaId =
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
-    TezTaskAttemptID noSpecTaId = TezTaskAttemptID
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexIdSpec, 0), 
0);
+    TezTaskAttemptID successfulTaId = TezTaskAttemptID
         .getInstance(TezTaskID.getInstance(vertexIdNoSpec, 0), 0);
 
     // cause speculation trigger for both
-    mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
-    mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100);
+    mockLauncher.setStatusUpdatesForTask(killedTaId, 
NUM_UPDATES_FOR_TEST_TASK);
+    mockLauncher.setStatusUpdatesForTask(successfulTaId, 
NUM_UPDATES_FOR_TEST_TASK);
 
     mockLauncher.startScheduling(true);
-    org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexId);
+    org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexIdSpec);
     org.apache.tez.dag.app.dag.Vertex vNoSpec = 
dagImpl.getVertex(vertexIdNoSpec);
     // Wait enough time to give chance for the speculator to trigger
     // speculation on VB.
@@ -476,7 +485,7 @@ public class TestSpeculation {
    * @throws Exception the exception
    */
   @Retry
-  @Test (timeout=10000)
+  @Test (timeout=30000)
   public void testBasicSpeculationNotUseful() throws Exception {
     DAG dag = DAG.create("test");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 
5);
@@ -491,8 +500,8 @@ public class TestSpeculation {
     TezTaskAttemptID successTaId = 
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
     TezTaskAttemptID killedTaId = 
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
 
-    mockLauncher.setStatusUpdatesForTask(successTaId, 100);
-    mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+    mockLauncher.setStatusUpdatesForTask(successTaId, 
NUM_UPDATES_FOR_TEST_TASK);
+    mockLauncher.setStatusUpdatesForTask(killedTaId, 
NUM_UPDATES_FOR_TEST_TASK);
 
     mockLauncher.startScheduling(true);
     dagClient.waitForCompletion();

Reply via email to