kennknowles commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r674256062
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
inputDataWatermark,
processingTime,
outputDataWatermark,
- synchronizedProcessingTime);
+ synchronizedProcessingTime,
+ null);
Review comment:
Prefer a noop lambda to making this `@Nullable`
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
- timers.put(
- getTimerDataKey(timerId, timerFamilyId),
- namespace,
- TimerData.of(timerId, timerFamilyId, namespace, timestamp,
outputTimestamp, timeDomain));
+ TimerData timer =
+ TimerData.of(timerId, timerFamilyId, namespace, timestamp,
outputTimestamp, timeDomain);
+ timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace,
true);
+ if (onTimerModified != null) {
Review comment:
ditto
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
}
// Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+ // An ordered list of any timers that were set or modified by user
processing earlier in this
+ // bundle.
+ // We use a NavigableSet instead of a priority queue to prevent duplicate
elements from ending
+ // up in the queue.
+ private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+ private NavigableSet<TimerData>
modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+ private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain
timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return modifiedUserEventTimersOrdered;
+ case PROCESSING_TIME:
+ return modifiedUserProcessingTimersOrdered;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return modifiedUserSynchronizedProcessingTimersOrdered;
+ default:
+ throw new RuntimeException("Unexpected time domain " + timeDomain);
+ }
+ }
+
+ // A list of timer keys that were modified by user processing earlier in
this bundle. This
+ // serves a tombstone, so
+ // that we know not to fire any bundle tiemrs that were moddified.
+ private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys =
null;
+
+ private void onUserTimerModified(TimerData timerData) {
+ if (!timerData.getDeleted()) {
+ getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+ }
+ modifiedUserTimerKeys.put(
+ WindmillTimerInternals.getTimerDataKey(timerData),
timerData.getNamespace(), timerData);
+ }
+
+ private boolean timerModified(TimerData timerData) {
+ String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+ @Nullable
+ TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey,
timerData.getNamespace());
+ return updatedTimer != null && !updatedTimer.equals(timerData);
+ }
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W>
windowCoder) {
if (cachedFiredUserTimers == null) {
+ // This is the first call to getNextFiredUserTimer in this bundle.
Extract any user timers
+ // from the bundle
+ // and cache the list for the rest of this bundle processing.
cachedFiredUserTimers =
-
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer, windowCoder))
- .iterator();
+ Iterators.peekingIterator(
+
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer, windowCoder))
+ .iterator());
}
- if (!cachedFiredUserTimers.hasNext()) {
- return null;
+ while (cachedFiredUserTimers.hasNext()) {
+ TimerData nextInBundle = cachedFiredUserTimers.peek();
+ NavigableSet<TimerData> modifiedUserTimersOrdered =
+ getModifiedUserTimersOrdered(nextInBundle.getDomain());
+ // If there is a modified timer that is earlier than the next timer in
the bundle, try and
Review comment:
Reading kind of quick so I probably missed something, but can
`cachedFiredUserTimers` just be a navigable set and when you set a
`modifiedUserTimer` you insert into it? (hence the earliest eligible timer is
always the one to fire next)
Is this a cost thing or semantics?
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
timerKey.getNamespace(),
true);
+ if (onTimerModified != null) {
Review comment:
Just use a noop fn.
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
}
// Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+ // An ordered list of any timers that were set or modified by user
processing earlier in this
+ // bundle.
+ // We use a NavigableSet instead of a priority queue to prevent duplicate
elements from ending
+ // up in the queue.
+ private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+ private NavigableSet<TimerData>
modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+ private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain
timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return modifiedUserEventTimersOrdered;
+ case PROCESSING_TIME:
+ return modifiedUserProcessingTimersOrdered;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return modifiedUserSynchronizedProcessingTimersOrdered;
+ default:
+ throw new RuntimeException("Unexpected time domain " + timeDomain);
+ }
+ }
+
+ // A list of timer keys that were modified by user processing earlier in
this bundle. This
+ // serves a tombstone, so
+ // that we know not to fire any bundle tiemrs that were moddified.
+ private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys =
null;
+
+ private void onUserTimerModified(TimerData timerData) {
+ if (!timerData.getDeleted()) {
+ getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+ }
+ modifiedUserTimerKeys.put(
+ WindmillTimerInternals.getTimerDataKey(timerData),
timerData.getNamespace(), timerData);
+ }
+
+ private boolean timerModified(TimerData timerData) {
+ String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+ @Nullable
+ TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey,
timerData.getNamespace());
+ return updatedTimer != null && !updatedTimer.equals(timerData);
+ }
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W>
windowCoder) {
if (cachedFiredUserTimers == null) {
+ // This is the first call to getNextFiredUserTimer in this bundle.
Extract any user timers
+ // from the bundle
+ // and cache the list for the rest of this bundle processing.
cachedFiredUserTimers =
-
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer, windowCoder))
- .iterator();
+ Iterators.peekingIterator(
+
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer, windowCoder))
+ .iterator());
}
- if (!cachedFiredUserTimers.hasNext()) {
- return null;
+ while (cachedFiredUserTimers.hasNext()) {
+ TimerData nextInBundle = cachedFiredUserTimers.peek();
+ NavigableSet<TimerData> modifiedUserTimersOrdered =
+ getModifiedUserTimersOrdered(nextInBundle.getDomain());
+ // If there is a modified timer that is earlier than the next timer in
the bundle, try and
+ // fire that first.
+ while (!modifiedUserTimersOrdered.isEmpty()
+ && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0)
{
+ TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+ if (!timerModified(earlierTimer)) {
+ // We must delete the timer. This prevents it from being committed
to the backing store.
+ // It also handles the
+ // case where the timer had been set to the far future and then
modified in bundle;
+ // without deleting the
+ // timer, the runner will still have that future timer stored, and
would fire it
+ // spuriously.
+ userTimerInternals.deleteTimer(earlierTimer);
+ return earlierTimer;
+ }
+ }
+ // There is no earlier timer to fire, so return the next timer in the
bundle.
+ nextInBundle = cachedFiredUserTimers.next();
+ if (!timerModified(nextInBundle)) {
+ // User timers must be explicitly deleted when delivered, to release
the implied hold.
+ userTimerInternals.deleteTimer(nextInBundle);
+ return nextInBundle;
+ }
}
- TimerData nextTimer = cachedFiredUserTimers.next();
- // User timers must be explicitly deleted when delivered, to release the
implied hold
- userTimerInternals.deleteTimer(nextTimer);
- return nextTimer;
+ return null;
Review comment:
Change return value to `@Nullable` (if this is really necessary)
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
timerKey.getNamespace(),
false);
+ if (onTimerModified != null) {
Review comment:
ditto
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
- timers.put(
- getTimerDataKey(timerId, timerFamilyId),
- namespace,
- TimerData.of(timerId, timerFamilyId, namespace, timestamp,
outputTimestamp, timeDomain));
+ TimerData timer =
Review comment:
Is it possible to combine the overrides here? Seems like a minor risk to
have a bunch of methods that have to have corresponding changes here. I am
reacting to the addition of the `onModified` callback to each of them. Seems
like one centralized method would be good if possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]