[ 
https://issues.apache.org/jira/browse/BEAM-5197?focusedWorklogId=170066&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170066
 ]

ASF GitHub Bot logged work on BEAM-5197:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Nov/18 00:50
            Start Date: 28/Nov/18 00:50
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #7138: [BEAM-5197] Fix 
UnboundedSourceWrapper#testWatermarkEmission
URL: https://github.com/apache/beam/pull/7138
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index e58907e05e4f..052cb04d8576 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -55,6 +55,9 @@
   private final boolean throwOnFirstSnapshot;
   private final int fixedNumSplits;
 
+  /** Flag to stall processing readers' elements. */
+  private transient volatile boolean haltEmission;
+
   /**
    * We only allow an exception to be thrown from getCheckpointMark at most 
once. This must be
    * static since the entire TestCountingSource instance may re-serialized 
when the pipeline
@@ -106,8 +109,14 @@ private TestCountingSource(
     this.fixedNumSplits = fixedNumSplits;
   }
 
-  public int getShardNumber() {
-    return shardNumber;
+  /** Halts emission of elements until {@code continueEmission} is invoked. */
+  void haltEmission() {
+    haltEmission = true;
+  }
+
+  /** Continues processing elements after {@code haltEmission} was invoked. */
+  void continueEmission() {
+    haltEmission = false;
   }
 
   @Override
@@ -163,7 +172,7 @@ public boolean start() {
 
     @Override
     public boolean advance() {
-      if (current >= numMessagesPerShard - 1) {
+      if (current >= numMessagesPerShard - 1 || haltEmission) {
         return false;
       }
       // If testing dedup, occasionally insert a duplicate value;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 28589d52f628..cd4fe5189350 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -31,7 +31,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -57,9 +57,8 @@
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
-import org.joda.time.Instant;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
@@ -68,6 +67,7 @@
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link UnboundedSourceWrapper}. */
+@RunWith(Enclosed.class)
 public class UnboundedSourceWrapperTest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);
@@ -216,15 +216,13 @@ public void close() {}
 
     /**
      * Creates a {@link UnboundedSourceWrapper} that has one or multiple 
readers per source. If
-     * numSplits > numTasks the source has one source will manage multiple 
readers.
+     * numSplits > numTasks the source will manage multiple readers.
      *
-     * <p>This test verifies that watermark are correctly forwarded.
+     * <p>This test verifies that watermarks are correctly forwarded.
      */
     @Test(timeout = 30_000)
-    @Ignore("https://issues.apache.org/jira/browse/BEAM-5197";) // deadlock on 
some platforms
     public void testWatermarkEmission() throws Exception {
       final int numElements = 500;
-      final Object checkpointLock = new Object();
       PipelineOptions options = PipelineOptionsFactory.create();
 
       // this source will emit exactly NUM_ELEMENTS across all parallel 
readers,
@@ -249,32 +247,36 @@ public void testWatermarkEmission() throws Exception {
                   numTasks /* max parallelism */,
                   numTasks /* parallelism */,
                   0 /* subtask index */);
+      testHarness.getExecutionConfig().setLatencyTrackingInterval(0);
+      testHarness.getExecutionConfig().setAutoWatermarkInterval(1);
 
-      testHarness.setProcessingTime(Instant.now().getMillis());
+      testHarness.setProcessingTime(Long.MIN_VALUE);
       testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
       final ConcurrentLinkedQueue<Object> caughtExceptions = new 
ConcurrentLinkedQueue<>();
 
-      // use the AtomicBoolean just for the set()/get() functionality for 
communicating
-      // with the outer Thread
-      final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+      // We test emission of two watermarks here, one intermediate, one final
+      final CountDownLatch seenWatermarks = new CountDownLatch(2);
+      final int minElementsPerReader = numElements / numSplits;
+      final CountDownLatch minElementsCountdown = new 
CountDownLatch(minElementsPerReader);
+
+      // first halt the source to test auto watermark emission
+      source.haltEmission();
+      testHarness.open();
 
       Thread sourceThread =
           new Thread(
               () -> {
                 try {
-                  testHarness.open();
                   sourceOperator.run(
-                      checkpointLock,
+                      testHarness.getCheckpointLock(),
                       new TestStreamStatusMaintainer(),
                       new Output<
                           
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
 
                         @Override
                         public void emitWatermark(Watermark watermark) {
-                          if (watermark.getTimestamp() >= numElements / 2) {
-                            seenWatermark.set(true);
-                          }
+                          seenWatermarks.countDown();
                         }
 
                         @Override
@@ -287,7 +289,9 @@ public void emitLatencyMarker(LatencyMarker latencyMarker) 
{}
                         @Override
                         public void collect(
                             
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
-                                windowedValueStreamRecord) {}
+                                windowedValueStreamRecord) {
+                          minElementsCountdown.countDown();
+                        }
 
                         @Override
                         public void close() {}
@@ -300,21 +304,37 @@ public void close() {}
 
       sourceThread.start();
 
-      while (true) {
-        if (!caughtExceptions.isEmpty()) {
-          fail("Caught exception(s): " + 
Joiner.on(",").join(caughtExceptions));
-        }
-        if (seenWatermark.get()) {
-          break;
-        }
+      while (flinkWrapper
+          .getLocalReaders()
+          .stream()
+          .anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
+        // readers haven't been initialized
         Thread.sleep(50);
+      }
 
-        // Need to advance this so that the watermark timers in the source 
wrapper fire
-        // Synchronize is necessary because this can interfere with updating 
the PriorityQueue
-        // of the ProcessingTimeService which is also accessed through 
UnboundedSourceWrapper.
-        synchronized (checkpointLock) {
-          testHarness.setProcessingTime(Instant.now().getMillis());
-        }
+      // Need to advance this so that the watermark timers in the source 
wrapper fire
+      // Synchronize is necessary because this can interfere with updating the 
PriorityQueue
+      // of the ProcessingTimeService which is also accessed through 
UnboundedSourceWrapper.
+      synchronized (testHarness.getCheckpointLock()) {
+        testHarness.setProcessingTime(0);
+      }
+
+      // now read the elements
+      source.continueEmission();
+      // ..and await elements
+      minElementsCountdown.await();
+
+      // Need to advance this so that the watermark timers in the source 
wrapper fire
+      // Synchronize is necessary because this can interfere with updating the 
PriorityQueue
+      // of the ProcessingTimeService which is also accessed through 
UnboundedSourceWrapper.
+      synchronized (testHarness.getCheckpointLock()) {
+        testHarness.setProcessingTime(Long.MAX_VALUE);
+      }
+
+      seenWatermarks.await();
+
+      if (!caughtExceptions.isEmpty()) {
+        fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
       }
 
       sourceOperator.cancel();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 170066)
    Time Spent: 6h 20m  (was: 6h 10m)

> Flaky test: UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-5197
>                 URL: https://issues.apache.org/jira/browse/BEAM-5197
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Thomas Weise
>            Assignee: Maximilian Michels
>            Priority: Critical
>              Labels: flake
>             Fix For: 2.10.0
>
>          Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> {code:java}
> java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:52)
>       at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:49)
>       at java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:670)
>       at java.util.PriorityQueue.siftUp(PriorityQueue.java:646)
>       at java.util.PriorityQueue.offer(PriorityQueue.java:345)
>       at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.registerTimer(TestProcessingTimeService.java:93)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:452)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:225)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest.testValueEmission(UnboundedSourceWrapperTest.java:153)
>       {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to