This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 89538a9 SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize
flakiness
89538a9 is described below
commit 89538a915d92e153dd3ff221f2abc9719dd69982
Author: mynameborat <[email protected]>
AuthorDate: Mon Jun 8 14:36:29 2020 -0700
SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness
**Problem**: The test uses sleep and expects the future to complete at the
end of the sleep duration. It introduces flakiness and results in false
negatives.
**Change**: Modified the tests to use latch instead of sleep
**Tests**: None
**API Changes**: None
**Upgrade Instructions**: None
**Usage Instructions**: None
Author: mynameborat <[email protected]>
Reviewers: Dengpanyin <[email protected]>
Closes #1379 from mynameborat/SAMZA-2545
---
.../samza/table/batching/TestBatchProcessor.java | 44 +++++++++++-----------
1 file changed, 23 insertions(+), 21 deletions(-)
diff --git
a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
index 2de3170..44ea246 100644
---
a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
+++
b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -23,25 +23,21 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.apache.samza.table.ReadWriteTable;
import org.junit.Assert;
import org.junit.Test;
-import static java.lang.Thread.*;
-import static org.mockito.Mockito.*;
+import static java.lang.Thread.sleep;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestBatchProcessor {
- private static final int SLOW_OPERATION_TIME_MS = 500;
- private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
- try {
- sleep(SLOW_OPERATION_TIME_MS);
- } catch (InterruptedException e) {
- // ignore
- }
- return null;
- };
public static class TestCreate {
@Test
@@ -86,9 +82,18 @@ public class TestBatchProcessor {
@Test
public void testBatchOperationTriggeredByBatchSize() {
final int maxBatchSize = 3;
+ final CountDownLatch batchCompletionTriggerLatch = new CountDownLatch(1);
+ final Supplier<Void> tableUpdateSupplier = () -> {
+ try {
+ batchCompletionTriggerLatch.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return null;
+ };
final ReadWriteTable<Integer, Integer> table =
mock(ReadWriteTable.class);
-
when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+
when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(tableUpdateSupplier));
final BatchProcessor<Integer, Integer> batchProcessor =
createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
@@ -104,15 +109,12 @@ public class TestBatchProcessor {
}
Assert.assertEquals(0, batchProcessor.size());
- try {
- sleep(SLOW_OPERATION_TIME_MS * 2);
- } catch (InterruptedException e) {
- // ignore
- }
-
- for (int i = 0; i < maxBatchSize; i++) {
- Assert.assertTrue(futureList.get(i).isDone());
- }
+ // Complete the async call to the underlying table
+ batchCompletionTriggerLatch.countDown();
+ // The latch should eventually trigger completion to the future returned
by the batch processor
+ CompletableFuture
+ .allOf(futureList.toArray(new CompletableFuture[0]))
+ .join();
}
@Test