roeyshemtov commented on a change in pull request #11972:
URL: https://github.com/apache/flink/pull/11972#discussion_r421668934



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code 
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the 
interval,
+ * you can control if the timer will be registered periodic for each event 
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Trigger<T, W> nestedTrigger;
+       private final long interval;
+       private final boolean continualInterval;
+       private final boolean shouldClearAtTimeout;
+
+       private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+       private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long 
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+               this.nestedTrigger = nestedTrigger;
+               this.interval = interval;
+               this.continualInterval = continualInterval;
+               this.shouldClearAtTimeout = shouldClearAtTimeout;
+               this.timeoutStateDesc = new 
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+       }
+
+       @Override
+       public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               TriggerResult triggerResult = 
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+               if (triggerResult.isFire()) {
+                       this.clear(window, ctx);
+                       return triggerResult;
+               }
+               
+               ValueState<Long> timeoutState = 
ctx.getPartitionedState(this.timeoutStateDesc);
+               long nextFireTimestamp = ctx.getCurrentProcessingTime() + 
this.interval;
+
+               if (timeoutState.value() != null && continualInterval) {
+                       ctx.deleteProcessingTimeTimer(timeoutState.value());
+                       timeoutState.clear();
+               }
+
+               if (timeoutState.value() == null) {
+                       timeoutState.update(nextFireTimestamp);
+                       ctx.registerProcessingTimeTimer(nextFireTimestamp);
+               }
+
+               return triggerResult;
+       }
+
+       @Override
+       public TriggerResult onProcessingTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+               if (shouldClearAtTimeout) {
+                       this.clear(window, ctx);
+               }
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public TriggerResult onEventTime(long timestamp, W window, 
TriggerContext ctx) throws Exception {
+               this.nestedTrigger.onEventTime(timestamp, window, ctx);
+               return TriggerResult.FIRE;
+       }
+
+       @Override
+       public void clear(W window, TriggerContext ctx) throws Exception {
+               ctx.getPartitionedState(this.timeoutStateDesc).clear();
+               this.nestedTrigger.clear(window, ctx);
+       }
+
+       @Override
+       public String toString() {
+               return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+       }
+
+       public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> 
of(Trigger<T, W> nestedTrigger, long interval) {

Review comment:
       Added to the second method (because it extension of the first). is that 
enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to