This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 1bd13bd TEZ-4206: TestSpeculation.testBasicSpeculationPerVertexConf
is flaky (Mustafa Iman via Ashutosh Chauhan)
1bd13bd is described below
commit 1bd13bdbafdb5760c0aa39c8c6de917380eb9a2f
Author: Mustafa Iman <[email protected]>
AuthorDate: Thu Jul 30 12:08:18 2020 +0200
TEZ-4206: TestSpeculation.testBasicSpeculationPerVertexConf is flaky
(Mustafa Iman via Ashutosh Chauhan)
Signed-off-by: Laszlo Bodor <[email protected]>
---
.../dag/speculation/legacy/LegacySpeculator.java | 23 ++++--------
.../org/apache/tez/dag/app/TestSpeculation.java | 41 +++++++++++++---------
2 files changed, 31 insertions(+), 33 deletions(-)
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index f21b819..6bf02df 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -23,13 +23,10 @@ import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.service.AbstractService;
@@ -101,8 +98,6 @@ public class LegacySpeculator extends AbstractService {
private final Clock clock;
private Thread speculationBackgroundThread = null;
private volatile boolean stopped = false;
- /** Allow the speculator to wait on a blockingQueue in case we use it for
event notification. */
- private BlockingQueue<Object> scanControl = new
LinkedBlockingQueue<Object>();
@VisibleForTesting
public int getMinimumAllowedSpeculativeTasks() { return
minimumAllowedSpeculativeTasks;}
@@ -233,16 +228,6 @@ public class LegacySpeculator extends AbstractService {
}
}
- // This interface is intended to be used only for test cases.
- public void scanForSpeculationsForTesting() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("We got asked to run a debug speculation scan.");
- LOG.debug("There are {} speculative events stacked already.",
scanControl.size());
- }
- scanControl.add(new Object());
- Thread.yield();
- }
-
public Runnable createThread() {
return new Runnable() {
@Override
@@ -256,9 +241,13 @@ public class LegacySpeculator extends AbstractService {
long wait = Math.max(nextRecompTime, clock.getTime() -
backgroundRunStartTime);
if (speculations > 0) {
LOG.info("We launched " + speculations
- + " speculations. Waiting " + wait + " milliseconds.");
+ + " speculations. Waiting " + wait + " milliseconds before
next evaluation.");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting {} milliseconds before next evaluation.",
wait);
+ }
}
- Object pollResult = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+ Thread.sleep(wait);
} catch (InterruptedException ie) {
if (!stopped) {
LOG.warn("Speculator thread interrupted", ie);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index b9a7c5a..b6f5030 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -84,6 +84,16 @@ public class TestSpeculation {
"Number of attempts after Speculation should be two";
private static final String UNIT_EXCEPTION_MESSAGE =
"test timed out after";
+
+ /**
+ * {@link MockDAGAppMaster#launcherSleepTime} advances tasks every 1
millisecond.
+ * We want our test task to take at least slightly more than 1 second. This
is because
+ * MockDAGAppMaster's mock clock advances clock 1 second at each tick. If we
are unlucky
+ * this may cause speculator to wait 1 second between each evaluation. If we
are really
+ * unlucky, our test tasks finish before speculator has a chance to evaluate
and speculate
+ * them. That is why we want the tasks to take at least one second.
+ */
+ private static final int NUM_UPDATES_FOR_TEST_TASK = 1200;
private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
private Configuration defaultConf;
private FileSystem localFs;
@@ -277,7 +287,7 @@ public class TestSpeculation {
* @throws Exception the exception
*/
@Retry
- @Test (timeout = 10000)
+ @Test (timeout = 30000)
public void testSingleTaskSpeculation() throws Exception {
// Map<Timeout conf value, expected number of tasks>
Map<Long, Integer> confToExpected = new HashMap<Long, Integer>();
@@ -308,7 +318,7 @@ public class TestSpeculation {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
Thread.sleep(200);
// cause speculation trigger
- mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+ mockLauncher.setStatusUpdatesForTask(killedTaId,
NUM_UPDATES_FOR_TEST_TASK);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
@@ -350,7 +360,7 @@ public class TestSpeculation {
mockLauncher.updateProgress(withProgress);
// cause speculation trigger
- mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+ mockLauncher.setStatusUpdatesForTask(killedTaId,
NUM_UPDATES_FOR_TEST_TASK);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
@@ -392,7 +402,7 @@ public class TestSpeculation {
* @throws Exception the exception
*/
@Retry
- @Test (timeout=10000)
+ @Test (timeout=30000)
public void testBasicSpeculationWithProgress() throws Exception {
testBasicSpeculation(true);
}
@@ -403,7 +413,7 @@ public class TestSpeculation {
* @throws Exception the exception
*/
@Retry
- @Test (timeout=10000)
+ @Test (timeout=30000)
public void testBasicSpeculationWithoutProgress() throws Exception {
testBasicSpeculation(false);
}
@@ -414,12 +424,11 @@ public class TestSpeculation {
* @throws Exception the exception
*/
@Retry
- @Test (timeout=10000)
+ @Test (timeout=30000)
public void testBasicSpeculationPerVertexConf() throws Exception {
DAG dag = DAG.create("test");
String vNameNoSpec = "A";
String vNameSpec = "B";
- String speculatorSleepTime = "50";
Vertex vA = Vertex.create(vNameNoSpec,
ProcessorDescriptor.create("Proc.class"), 5);
Vertex vB = Vertex.create(vNameSpec,
ProcessorDescriptor.create("Proc.class"), 5);
vA.setConf(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, "false");
@@ -436,20 +445,20 @@ public class TestSpeculation {
DAGClient dagClient = tezClient.submitDAG(dag);
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
- TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId();
+ TezVertexID vertexIdSpec = dagImpl.getVertex(vNameSpec).getVertexId();
TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
// original attempt is killed and speculative one is successful
TezTaskAttemptID killedTaId =
- TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
- TezTaskAttemptID noSpecTaId = TezTaskAttemptID
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexIdSpec, 0),
0);
+ TezTaskAttemptID successfulTaId = TezTaskAttemptID
.getInstance(TezTaskID.getInstance(vertexIdNoSpec, 0), 0);
// cause speculation trigger for both
- mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
- mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100);
+ mockLauncher.setStatusUpdatesForTask(killedTaId,
NUM_UPDATES_FOR_TEST_TASK);
+ mockLauncher.setStatusUpdatesForTask(successfulTaId,
NUM_UPDATES_FOR_TEST_TASK);
mockLauncher.startScheduling(true);
- org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexId);
+ org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexIdSpec);
org.apache.tez.dag.app.dag.Vertex vNoSpec =
dagImpl.getVertex(vertexIdNoSpec);
// Wait enough time to give chance for the speculator to trigger
// speculation on VB.
@@ -476,7 +485,7 @@ public class TestSpeculation {
* @throws Exception the exception
*/
@Retry
- @Test (timeout=10000)
+ @Test (timeout=30000)
public void testBasicSpeculationNotUseful() throws Exception {
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"),
5);
@@ -491,8 +500,8 @@ public class TestSpeculation {
TezTaskAttemptID successTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
TezTaskAttemptID killedTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
- mockLauncher.setStatusUpdatesForTask(successTaId, 100);
- mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+ mockLauncher.setStatusUpdatesForTask(successTaId,
NUM_UPDATES_FOR_TEST_TASK);
+ mockLauncher.setStatusUpdatesForTask(killedTaId,
NUM_UPDATES_FOR_TEST_TASK);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();