[
https://issues.apache.org/jira/browse/BEAM-5197?focusedWorklogId=170066&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170066
]
ASF GitHub Bot logged work on BEAM-5197:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Nov/18 00:50
Start Date: 28/Nov/18 00:50
Worklog Time Spent: 10m
Work Description: tweise closed pull request #7138: [BEAM-5197] Fix
UnboundedSourceWrapper#testWatermarkEmission
URL: https://github.com/apache/beam/pull/7138
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index e58907e05e4f..052cb04d8576 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -55,6 +55,9 @@
private final boolean throwOnFirstSnapshot;
private final int fixedNumSplits;
+ /** Flag to stall processing readers' elements. */
+ private transient volatile boolean haltEmission;
+
/**
* We only allow an exception to be thrown from getCheckpointMark at most
once. This must be
* static since the entire TestCountingSource instance may re-serialized
when the pipeline
@@ -106,8 +109,14 @@ private TestCountingSource(
this.fixedNumSplits = fixedNumSplits;
}
- public int getShardNumber() {
- return shardNumber;
+ /** Halts emission of elements until {@code continueEmission} is invoked. */
+ void haltEmission() {
+ haltEmission = true;
+ }
+
+ /** Continues processing elements after {@code haltEmission} was invoked. */
+ void continueEmission() {
+ haltEmission = false;
}
@Override
@@ -163,7 +172,7 @@ public boolean start() {
@Override
public boolean advance() {
- if (current >= numMessagesPerShard - 1) {
+ if (current >= numMessagesPerShard - 1 || haltEmission) {
return false;
}
// If testing dedup, occasionally insert a duplicate value;
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 28589d52f628..cd4fe5189350 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -31,7 +31,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -57,9 +57,8 @@
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
-import org.joda.time.Instant;
-import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
@@ -68,6 +67,7 @@
import org.slf4j.LoggerFactory;
/** Tests for {@link UnboundedSourceWrapper}. */
+@RunWith(Enclosed.class)
public class UnboundedSourceWrapperTest {
private static final Logger LOG =
LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);
@@ -216,15 +216,13 @@ public void close() {}
/**
* Creates a {@link UnboundedSourceWrapper} that has one or multiple
readers per source. If
- * numSplits > numTasks the source has one source will manage multiple
readers.
+ * numSplits > numTasks the source will manage multiple readers.
*
- * <p>This test verifies that watermark are correctly forwarded.
+ * <p>This test verifies that watermarks are correctly forwarded.
*/
@Test(timeout = 30_000)
- @Ignore("https://issues.apache.org/jira/browse/BEAM-5197") // deadlock on
some platforms
public void testWatermarkEmission() throws Exception {
final int numElements = 500;
- final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel
readers,
@@ -249,32 +247,36 @@ public void testWatermarkEmission() throws Exception {
numTasks /* max parallelism */,
numTasks /* parallelism */,
0 /* subtask index */);
+ testHarness.getExecutionConfig().setLatencyTrackingInterval(0);
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(1);
- testHarness.setProcessingTime(Instant.now().getMillis());
+ testHarness.setProcessingTime(Long.MIN_VALUE);
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
final ConcurrentLinkedQueue<Object> caughtExceptions = new
ConcurrentLinkedQueue<>();
- // use the AtomicBoolean just for the set()/get() functionality for
communicating
- // with the outer Thread
- final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+ // We test emission of two watermarks here, one intermediate, one final
+ final CountDownLatch seenWatermarks = new CountDownLatch(2);
+ final int minElementsPerReader = numElements / numSplits;
+ final CountDownLatch minElementsCountdown = new
CountDownLatch(minElementsPerReader);
+
+ // first halt the source to test auto watermark emission
+ source.haltEmission();
+ testHarness.open();
Thread sourceThread =
new Thread(
() -> {
try {
- testHarness.open();
sourceOperator.run(
- checkpointLock,
+ testHarness.getCheckpointLock(),
new TestStreamStatusMaintainer(),
new Output<
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@Override
public void emitWatermark(Watermark watermark) {
- if (watermark.getTimestamp() >= numElements / 2) {
- seenWatermark.set(true);
- }
+ seenWatermarks.countDown();
}
@Override
@@ -287,7 +289,9 @@ public void emitLatencyMarker(LatencyMarker latencyMarker)
{}
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
- windowedValueStreamRecord) {}
+ windowedValueStreamRecord) {
+ minElementsCountdown.countDown();
+ }
@Override
public void close() {}
@@ -300,21 +304,37 @@ public void close() {}
sourceThread.start();
- while (true) {
- if (!caughtExceptions.isEmpty()) {
- fail("Caught exception(s): " +
Joiner.on(",").join(caughtExceptions));
- }
- if (seenWatermark.get()) {
- break;
- }
+ while (flinkWrapper
+ .getLocalReaders()
+ .stream()
+ .anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
+ // readers haven't been initialized
Thread.sleep(50);
+ }
- // Need to advance this so that the watermark timers in the source
wrapper fire
- // Synchronize is necessary because this can interfere with updating
the PriorityQueue
- // of the ProcessingTimeService which is also accessed through
UnboundedSourceWrapper.
- synchronized (checkpointLock) {
- testHarness.setProcessingTime(Instant.now().getMillis());
- }
+ // Need to advance this so that the watermark timers in the source
wrapper fire
+ // Synchronize is necessary because this can interfere with updating the
PriorityQueue
+ // of the ProcessingTimeService which is also accessed through
UnboundedSourceWrapper.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.setProcessingTime(0);
+ }
+
+ // now read the elements
+ source.continueEmission();
+ // ..and await elements
+ minElementsCountdown.await();
+
+ // Need to advance this so that the watermark timers in the source
wrapper fire
+ // Synchronize is necessary because this can interfere with updating the
PriorityQueue
+ // of the ProcessingTimeService which is also accessed through
UnboundedSourceWrapper.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.setProcessingTime(Long.MAX_VALUE);
+ }
+
+ seenWatermarks.await();
+
+ if (!caughtExceptions.isEmpty()) {
+ fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
}
sourceOperator.cancel();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 170066)
Time Spent: 6h 20m (was: 6h 10m)
> Flaky test: UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest
> ------------------------------------------------------------------------------
>
> Key: BEAM-5197
> URL: https://issues.apache.org/jira/browse/BEAM-5197
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Thomas Weise
> Assignee: Maximilian Michels
> Priority: Critical
> Labels: flake
> Fix For: 2.10.0
>
> Time Spent: 6h 20m
> Remaining Estimate: 0h
>
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:52)
> at
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$1.compare(TestProcessingTimeService.java:49)
> at java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:670)
> at java.util.PriorityQueue.siftUp(PriorityQueue.java:646)
> at java.util.PriorityQueue.offer(PriorityQueue.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.registerTimer(TestProcessingTimeService.java:93)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:452)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:225)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest.testValueEmission(UnboundedSourceWrapperTest.java:153)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)