This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new a2deb84 TEZ-4165. Speed up
TestShuffleScheduler#testNumParallelScheduledFetchers
a2deb84 is described below
commit a2deb844b7f8578f5bd2a2dbefe66780a560389e
Author: László Bodor <[email protected]>
AuthorDate: Thu Apr 30 04:58:33 2020 -0500
TEZ-4165. Speed up TestShuffleScheduler#testNumParallelScheduledFetchers
Signed-off-by: Jonathan Eagles <[email protected]>
(cherry picked from commit 4e4b8e5c9f373cff182272249b10a0d101254080)
---
.../shuffle/orderedgrouped/TestShuffleScheduler.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 7a7b1ee..fabfa27 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -31,11 +31,11 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -90,7 +90,6 @@ public class TestShuffleScheduler {
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle,
mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName", true);
-
Future<Void> executorFuture = null;
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
@@ -111,10 +110,9 @@ public class TestShuffleScheduler {
scheduler.addKnownMapOutput("host" + i, 10000, 1,
inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
-
- // Sleep for a bit to allow the copies to be scheduled.
- Thread.sleep(2000l);
- assertEquals(10, scheduler.numFetchersCreated.get());
+ // wait for all the copies to be scheduled with timeout
+ scheduler.latch.await(2000, TimeUnit.MILLISECONDS);
+ assertEquals(0, scheduler.latch.getCount());
} finally {
scheduler.close();
@@ -1033,7 +1031,7 @@ public class TestShuffleScheduler {
private static class ShuffleSchedulerForTest extends ShuffleScheduler {
- private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
+ private CountDownLatch latch = new CountDownLatch(10);
private final boolean fetcherShouldWait;
private final ExceptionReporter reporter;
private final InputContext inputContext;
@@ -1067,7 +1065,7 @@ public class TestShuffleScheduler {
@Override
FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
- numFetchersCreated.incrementAndGet();
+ latch.countDown();
FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class);
doAnswer(new Answer<Void>() {
@Override