aljoscha commented on a change in pull request #11972:
URL: https://github.com/apache/flink/pull/11972#discussion_r421343103



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code 
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the 
interval,
+ * you can control if the timer will be registered periodic for each event 
arriving,
+ * by the continualInterval flag.

Review comment:
       Is maybe `resetTimerOnEvent` a better name for `continualInterval`, 
because that's what it does right? When `continualInterval` is `true` the timer 
period will be extended on each received event, right?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code 
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the 
interval,
+ * you can control if the timer will be registered periodic for each event 
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Trigger<T, W> nestedTrigger;
+       private final long interval;
+       private final boolean continualInterval;
+       private final boolean shouldClearAtTimeout;
+
+       private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+       private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long 
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+               this.nestedTrigger = nestedTrigger;
+               this.interval = interval;
+               this.continualInterval = continualInterval;
+               this.shouldClearAtTimeout = shouldClearAtTimeout;
+               this.timeoutStateDesc = new 
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               TriggerResult triggerResult = 
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+               if (triggerResult.isFire()) {
+                       this.clear(window, ctx);
+                       return triggerResult;
+               }
+               
+               ValueState<Long> timeoutState = 
ctx.getPartitionedState(this.timeoutStateDesc);
+               long nextFireTimestamp = ctx.getCurrentProcessingTime() + 
this.interval;
+
+               if (timeoutState.value() != null && continualInterval) {
+                       ctx.deleteProcessingTimeTimer(timeoutState.value());
+                       timeoutState.clear();
+               }
+
+               if (timeoutState.value() == null) {
+                       timeoutState.update(nextFireTimestamp);
+                       ctx.registerProcessingTimeTimer(nextFireTimestamp);
+               }
+
+               return triggerResult;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+               if (shouldClearAtTimeout) {
+                       this.clear(window, ctx);
+               }
+               return TriggerResult.FIRE;

Review comment:
       I think we should maybe return `FIRE_AND_PURGE` for these if the nested 
trigger returns anything that has "purge". What do you think?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code 
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the 
interval,
+ * you can control if the timer will be registered periodic for each event 
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Trigger<T, W> nestedTrigger;
+       private final long interval;
+       private final boolean continualInterval;
+       private final boolean shouldClearAtTimeout;
+
+       private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+       private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long 
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+               this.nestedTrigger = nestedTrigger;
+               this.interval = interval;
+               this.continualInterval = continualInterval;
+               this.shouldClearAtTimeout = shouldClearAtTimeout;
+               this.timeoutStateDesc = new 
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               TriggerResult triggerResult = 
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+               if (triggerResult.isFire()) {
+                       this.clear(window, ctx);
+                       return triggerResult;
+               }
+               
+               ValueState<Long> timeoutState = 
ctx.getPartitionedState(this.timeoutStateDesc);
+               long nextFireTimestamp = ctx.getCurrentProcessingTime() + 
this.interval;
+
+               if (timeoutState.value() != null && continualInterval) {
+                       ctx.deleteProcessingTimeTimer(timeoutState.value());
+                       timeoutState.clear();
+               }
+
+               if (timeoutState.value() == null) {

Review comment:
       `timeoutState.value()` should only be called once in this method. Every 
access is potentially costly because we might have to go out to RocksDB for 
state access if the RocksDB state backend is used.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code 
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the 
interval,
+ * you can control if the timer will be registered periodic for each event 
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Trigger<T, W> nestedTrigger;
+       private final long interval;
+       private final boolean continualInterval;
+       private final boolean shouldClearAtTimeout;
+
+       private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+       private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long 
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+               this.nestedTrigger = nestedTrigger;
+               this.interval = interval;
+               this.continualInterval = continualInterval;
+               this.shouldClearAtTimeout = shouldClearAtTimeout;
+               this.timeoutStateDesc = new 
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               TriggerResult triggerResult = 
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+               if (triggerResult.isFire()) {
+                       this.clear(window, ctx);
+                       return triggerResult;
+               }
+               
+               ValueState<Long> timeoutState = 
ctx.getPartitionedState(this.timeoutStateDesc);
+               long nextFireTimestamp = ctx.getCurrentProcessingTime() + 
this.interval;
+
+               if (timeoutState.value() != null && continualInterval) {
+                       ctx.deleteProcessingTimeTimer(timeoutState.value());
+                       timeoutState.clear();
+               }
+
+               if (timeoutState.value() == null) {
+                       timeoutState.update(nextFireTimestamp);
+                       ctx.registerProcessingTimeTimer(nextFireTimestamp);
+               }
+
+               return triggerResult;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+               if (shouldClearAtTimeout) {
+                       this.clear(window, ctx);
+               }
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public TriggerResult onEventTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onEventTime(timestamp, window, ctx);
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {
+               ctx.getPartitionedState(this.timeoutStateDesc).clear();
+               this.nestedTrigger.clear(window, ctx);

Review comment:
       A trigger should also clear any potential timers on `clear()`. See for 
example the `EventTimeTrigger` or `ProcessingTimeTrigger`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code 
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the 
interval,
+ * you can control if the timer will be registered periodic for each event 
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Trigger<T, W> nestedTrigger;
+       private final long interval;
+       private final boolean continualInterval;
+       private final boolean shouldClearAtTimeout;
+
+       private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+       private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long 
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+               this.nestedTrigger = nestedTrigger;
+               this.interval = interval;
+               this.continualInterval = continualInterval;
+               this.shouldClearAtTimeout = shouldClearAtTimeout;
+               this.timeoutStateDesc = new 
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               TriggerResult triggerResult = 
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+               if (triggerResult.isFire()) {
+                       this.clear(window, ctx);
+                       return triggerResult;
+               }
+               
+               ValueState<Long> timeoutState = 
ctx.getPartitionedState(this.timeoutStateDesc);
+               long nextFireTimestamp = ctx.getCurrentProcessingTime() + 
this.interval;
+
+               if (timeoutState.value() != null && continualInterval) {
+                       ctx.deleteProcessingTimeTimer(timeoutState.value());
+                       timeoutState.clear();
+               }
+
+               if (timeoutState.value() == null) {
+                       timeoutState.update(nextFireTimestamp);
+                       ctx.registerProcessingTimeTimer(nextFireTimestamp);
+               }
+
+               return triggerResult;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+               if (shouldClearAtTimeout) {
+                       this.clear(window, ctx);
+               }
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public TriggerResult onEventTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onEventTime(timestamp, window, ctx);
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {
+               ctx.getPartitionedState(this.timeoutStateDesc).clear();
+               this.nestedTrigger.clear(window, ctx);
+       }
+
+       @Override
+       public String toString() {
+               return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+       }
+
+       public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> 
of(Trigger<T, W> nestedTrigger, long interval) {

Review comment:
       These method should have Javadoc that describes what the parameters do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to