This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 88acc52 [BEAM-8543] Dataflow streaming timers are not strictly time
ordered when set earlier mid-bundle (#11924)
88acc52 is described below
commit 88acc5267f759d81e9836a9db17b9e0ee521c785
Author: Rehman Murad Ali <[email protected]>
AuthorDate: Thu Jul 30 01:06:52 2020 +0500
[BEAM-8543] Dataflow streaming timers are not strictly time ordered when
set earlier mid-bundle (#11924)
---
CHANGES.md | 1 +
.../worker/StreamingModeExecutionContext.java | 75 +++++++++++++++++-----
.../dataflow/worker/WindmillTimerInternals.java | 25 ++++++++
.../org/apache/beam/sdk/transforms/ParDoTest.java | 69 ++++++++++++++++----
4 files changed, 141 insertions(+), 29 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1cfc52c..c2665e7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -146,6 +146,7 @@
* Upgrade Sphinx to 3.0.3 for building PyDoc.
* Added a PTransform for image annotation using Google Cloud AI image
processing service
([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646))
+* Dataflow streaming timers are not strictly time ordered when set earlier
mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
## Breaking Changes
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 7ed390b..334f145 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -24,10 +24,12 @@ import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@@ -519,7 +521,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
synchronizedProcessingTime);
this.cachedFiredTimers = null;
- this.cachedFiredUserTimers = null;
+ this.toBeFiredTimersOrdered = null;
}
public void flushState() {
@@ -559,28 +561,67 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
return nextTimer;
}
- // Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PriorityQueue<TimerData> toBeFiredTimersOrdered = null;
+
+ // to track if timer is reset earlier mid-bundle.
+ // Map of timer's id to timer's firing time to check
+ // the actual firing time of a timer.
+ private Map<String, Instant> firedTimer = new HashMap<>();
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W>
windowCoder) {
- if (cachedFiredUserTimers == null) {
- cachedFiredUserTimers =
-
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer, windowCoder))
- .iterator();
+ if (toBeFiredTimersOrdered == null) {
+
+ toBeFiredTimersOrdered = new
PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
+
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer,
windowCoder))
+ .iterator()
+ .forEachRemaining(
+ timerData -> {
+ firedTimer.put(
+ timerData.getTimerId() + '+' +
timerData.getTimerFamilyId(),
+ timerData.getTimestamp());
+ toBeFiredTimersOrdered.add(timerData);
+ });
}
- if (!cachedFiredUserTimers.hasNext()) {
+ Instant currentInputWatermark =
userTimerInternals.currentInputWatermarkTime();
+
+ if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+ List<TimerData> currentTimers = userTimerInternals.getCurrentTimers();
+
+ for (TimerData timerData : currentTimers) {
+ firedTimer.put(
+ timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
+ timerData.getTimestamp());
+ toBeFiredTimersOrdered.add(timerData);
+ }
+ }
+
+ TimerData nextTimer = null;
+
+ // fire timer only if its timestamp matched. Else it is either reset or
obsolete.
+ while (!toBeFiredTimersOrdered.isEmpty()) {
+ nextTimer = toBeFiredTimersOrdered.poll();
+ String timerUniqueId = nextTimer.getTimerId() + '+' +
nextTimer.getTimerFamilyId();
+ if (firedTimer.containsKey(timerUniqueId)
+ &&
firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) {
+ break;
+ } else {
+ nextTimer = null;
+ }
+ }
+
+ if (nextTimer == null) {
return null;
}
- TimerData nextTimer = cachedFiredUserTimers.next();
+
// User timers must be explicitly deleted when delivered, to release the
implied hold
userTimerInternals.deleteTimer(nextTimer);
return nextTimer;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index f46fd49..5269cf2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -22,6 +22,8 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
@@ -225,6 +227,29 @@ class WindmillTimerInternals implements TimerInternals {
timers.clear();
}
+ public boolean hasTimerBefore(Instant time) {
+ for (Cell<String, StateNamespace, Boolean> cell :
timerStillPresent.cellSet()) {
+ TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
+ if (cell.getValue()) {
+ if (timerData.getTimestamp().isBefore(time)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public List<TimerData> getCurrentTimers() {
+ List<TimerData> timerDataList = new ArrayList<>();
+ for (Cell<String, StateNamespace, Boolean> cell :
timerStillPresent.cellSet()) {
+ TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
+ if (cell.getValue()) {
+ timerDataList.add(timerData);
+ }
+ }
+ return timerDataList;
+ }
+
private boolean needsWatermarkHold(TimerData timerData) {
// If it is a user timer or a system timer with outputTimestamp different
than timestamp
return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3f0d13c..776fd3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3959,7 +3959,7 @@ public class ParDoTest implements Serializable {
}
testEventTimeTimerOrderingWithInputPTransform(
- now, numTestElements, builder.advanceWatermarkToInfinity());
+ now, numTestElements, builder.advanceWatermarkToInfinity(),
IsBounded.BOUNDED);
}
/** A test makes sure that an event time timers are correctly ordered
using Create transform. */
@@ -3970,7 +3970,7 @@ public class ParDoTest implements Serializable {
UsesStatefulParDo.class,
UsesStrictTimerOrdering.class
})
- public void testEventTimeTimerOrderingWithCreate() throws Exception {
+ public void testEventTimeTimerOrderingWithCreateBounded() throws Exception
{
final int numTestElements = 100;
final Instant now = new Instant(1500000000000L);
@@ -3980,13 +3980,39 @@ public class ParDoTest implements Serializable {
}
testEventTimeTimerOrderingWithInputPTransform(
- now, numTestElements, Create.timestamped(elements));
+ now, numTestElements, Create.timestamped(elements),
IsBounded.BOUNDED);
+ }
+
+ /**
+ * A test makes sure that an event time timers are correctly ordered using
Create transform
+ * unbounded.
+ */
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ UsesStatefulParDo.class,
+ UsesUnboundedPCollections.class,
+ UsesStrictTimerOrdering.class
+ })
+ public void testEventTimeTimerOrderingWithCreateUnbounded() throws
Exception {
+ final int numTestElements = 100;
+ final Instant now = new Instant(1500000000000L);
+
+ List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+ for (int i = 0; i < numTestElements; i++) {
+ elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i *
1000)));
+ }
+
+ testEventTimeTimerOrderingWithInputPTransform(
+ now, numTestElements, Create.timestamped(elements),
IsBounded.UNBOUNDED);
}
private void testEventTimeTimerOrderingWithInputPTransform(
Instant now,
int numTestElements,
- PTransform<PBegin, PCollection<KV<String, String>>> transform)
+ PTransform<PBegin, PCollection<KV<String, String>>> transform,
+ IsBounded isBounded)
throws Exception {
final String timerIdBagAppend = "append";
@@ -4070,7 +4096,8 @@ public class ParDoTest implements Serializable {
}
};
- PCollection<String> output =
pipeline.apply(transform).apply(ParDo.of(fn));
+ PCollection<String> output =
+
pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn));
List<String> expected =
IntStream.rangeClosed(0, numTestElements)
.mapToObj(expandFn(numTestElements))
@@ -4154,16 +4181,25 @@ public class ParDoTest implements Serializable {
TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of()))
.addElements(KV.of(null, null))
.advanceWatermarkToInfinity();
- pipeline.apply(TwoTimerTest.of(now, end, input));
+ pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED));
+ pipeline.run();
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTimersInParDo.class,
UsesStrictTimerOrdering.class})
+ public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() {
+ Instant now = new Instant(1500000000000L);
+ Instant end = now.plus(100);
+ pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)),
IsBounded.BOUNDED));
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class,
UsesStrictTimerOrdering.class})
- public void testTwoTimersSettingEachOtherWithCreateAsInput() {
+ public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() {
Instant now = new Instant(1500000000000L);
Instant end = now.plus(100);
- pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null))));
+ pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)),
IsBounded.UNBOUNDED));
pipeline.run();
}
@@ -4337,18 +4373,26 @@ public class ParDoTest implements Serializable {
private static class TwoTimerTest extends PTransform<PBegin, PDone> {
private static PTransform<PBegin, PDone> of(
- Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void,
Void>>> input) {
- return new TwoTimerTest(start, end, input);
+ Instant start,
+ Instant end,
+ PTransform<PBegin, PCollection<KV<Void, Void>>> input,
+ IsBounded isBounded) {
+ return new TwoTimerTest(start, end, input, isBounded);
}
private final Instant start;
private final Instant end;
+ private final IsBounded isBounded;
private final transient PTransform<PBegin, PCollection<KV<Void, Void>>>
inputPTransform;
public TwoTimerTest(
- Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void,
Void>>> input) {
+ Instant start,
+ Instant end,
+ PTransform<PBegin, PCollection<KV<Void, Void>>> input,
+ IsBounded isBounded) {
this.start = start;
this.end = end;
+ this.isBounded = isBounded;
this.inputPTransform = input;
}
@@ -4361,6 +4405,7 @@ public class ParDoTest implements Serializable {
PCollection<String> result =
input
.apply(inputPTransform)
+ .setIsBoundedInternal(isBounded)
.apply(
ParDo.of(
new DoFn<KV<Void, Void>, String>() {
@@ -4425,7 +4470,7 @@ public class ParDoTest implements Serializable {
}));
List<String> expected =
- LongStream.rangeClosed(0, 100)
+ LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis())
.mapToObj(e -> (Long) e)
.flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e +
":" + e).stream())
.collect(Collectors.toList());