aljoscha commented on a change in pull request #11972:
URL: https://github.com/apache/flink/pull/11972#discussion_r421343103
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the
interval,
+ * you can control if the timer will be registered periodic for each event
arriving,
+ * by the continualInterval flag.
Review comment:
Is maybe `resetTimerOnEvent` a better name for `continualInterval`,
because that's what it does right? When `continualInterval` is `true` the timer
period will be extended on each received event, right?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the
interval,
+ * you can control if the timer will be registered periodic for each event
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Trigger<T, W> nestedTrigger;
+ private final long interval;
+ private final boolean continualInterval;
+ private final boolean shouldClearAtTimeout;
+
+ private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+ private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.continualInterval = continualInterval;
+ this.shouldClearAtTimeout = shouldClearAtTimeout;
+ this.timeoutStateDesc = new
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
+ TriggerResult triggerResult =
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+ if (triggerResult.isFire()) {
+ this.clear(window, ctx);
+ return triggerResult;
+ }
+
+ ValueState<Long> timeoutState =
ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() +
this.interval;
+
+ if (timeoutState.value() != null && continualInterval) {
+ ctx.deleteProcessingTimeTimer(timeoutState.value());
+ timeoutState.clear();
+ }
+
+ if (timeoutState.value() == null) {
+ timeoutState.update(nextFireTimestamp);
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ }
+
+ return triggerResult;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+ if (shouldClearAtTimeout) {
+ this.clear(window, ctx);
+ }
+ return TriggerResult.FIRE;
Review comment:
I think we should maybe return `FIRE_AND_PURGE` for these if the nested
trigger returns anything that has "purge". What do you think?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the
interval,
+ * you can control if the timer will be registered periodic for each event
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Trigger<T, W> nestedTrigger;
+ private final long interval;
+ private final boolean continualInterval;
+ private final boolean shouldClearAtTimeout;
+
+ private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+ private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.continualInterval = continualInterval;
+ this.shouldClearAtTimeout = shouldClearAtTimeout;
+ this.timeoutStateDesc = new
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
+ TriggerResult triggerResult =
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+ if (triggerResult.isFire()) {
+ this.clear(window, ctx);
+ return triggerResult;
+ }
+
+ ValueState<Long> timeoutState =
ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() +
this.interval;
+
+ if (timeoutState.value() != null && continualInterval) {
+ ctx.deleteProcessingTimeTimer(timeoutState.value());
+ timeoutState.clear();
+ }
+
+ if (timeoutState.value() == null) {
Review comment:
`timeoutState.value()` should only be called once in this method. Every
access is potentially costly because we might have to go out to RocksDB for
state access if the RocksDB state backend is used.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the
interval,
+ * you can control if the timer will be registered periodic for each event
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Trigger<T, W> nestedTrigger;
+ private final long interval;
+ private final boolean continualInterval;
+ private final boolean shouldClearAtTimeout;
+
+ private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+ private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.continualInterval = continualInterval;
+ this.shouldClearAtTimeout = shouldClearAtTimeout;
+ this.timeoutStateDesc = new
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
+ TriggerResult triggerResult =
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+ if (triggerResult.isFire()) {
+ this.clear(window, ctx);
+ return triggerResult;
+ }
+
+ ValueState<Long> timeoutState =
ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() +
this.interval;
+
+ if (timeoutState.value() != null && continualInterval) {
+ ctx.deleteProcessingTimeTimer(timeoutState.value());
+ timeoutState.clear();
+ }
+
+ if (timeoutState.value() == null) {
+ timeoutState.update(nextFireTimestamp);
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ }
+
+ return triggerResult;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+ if (shouldClearAtTimeout) {
+ this.clear(window, ctx);
+ }
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onEventTime(timestamp, window, ctx);
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public void clear(W window, TriggerContext ctx) throws Exception {
+ ctx.getPartitionedState(this.timeoutStateDesc).clear();
+ this.nestedTrigger.clear(window, ctx);
Review comment:
A trigger should also clear any potential timers on `clear()`. See for
example the `EventTimeTrigger` or `ProcessingTimeTrigger`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the
interval,
+ * you can control if the timer will be registered periodic for each event
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Trigger<T, W> nestedTrigger;
+ private final long interval;
+ private final boolean continualInterval;
+ private final boolean shouldClearAtTimeout;
+
+ private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+ private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.continualInterval = continualInterval;
+ this.shouldClearAtTimeout = shouldClearAtTimeout;
+ this.timeoutStateDesc = new
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
+ TriggerResult triggerResult =
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+ if (triggerResult.isFire()) {
+ this.clear(window, ctx);
+ return triggerResult;
+ }
+
+ ValueState<Long> timeoutState =
ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() +
this.interval;
+
+ if (timeoutState.value() != null && continualInterval) {
+ ctx.deleteProcessingTimeTimer(timeoutState.value());
+ timeoutState.clear();
+ }
+
+ if (timeoutState.value() == null) {
+ timeoutState.update(nextFireTimestamp);
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ }
+
+ return triggerResult;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+ if (shouldClearAtTimeout) {
+ this.clear(window, ctx);
+ }
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onEventTime(timestamp, window, ctx);
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public void clear(W window, TriggerContext ctx) throws Exception {
+ ctx.getPartitionedState(this.timeoutStateDesc).clear();
+ this.nestedTrigger.clear(window, ctx);
+ }
+
+ @Override
+ public String toString() {
+ return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+ }
+
+ public static <T, W extends Window> ProcessingTimeoutTrigger<T, W>
of(Trigger<T, W> nestedTrigger, long interval) {
Review comment:
These method should have Javadoc that describes what the parameters do.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]