aljoscha commented on a change in pull request #11972:
URL: https://github.com/apache/flink/pull/11972#discussion_r422832081
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code
Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the
interval,
+ * you can control if the timer will be registered periodic for each event
arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can
operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Trigger<T, W> nestedTrigger;
+ private final long interval;
+ private final boolean continualInterval;
+ private final boolean shouldClearAtTimeout;
+
+ private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+ private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long
interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.continualInterval = continualInterval;
+ this.shouldClearAtTimeout = shouldClearAtTimeout;
+ this.timeoutStateDesc = new
ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
+ TriggerResult triggerResult =
this.nestedTrigger.onElement(element, timestamp, window, ctx);
+ if (triggerResult.isFire()) {
+ this.clear(window, ctx);
+ return triggerResult;
+ }
+
+ ValueState<Long> timeoutState =
ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() +
this.interval;
+
+ if (timeoutState.value() != null && continualInterval) {
+ ctx.deleteProcessingTimeTimer(timeoutState.value());
+ timeoutState.clear();
+ }
+
+ if (timeoutState.value() == null) {
+ timeoutState.update(nextFireTimestamp);
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ }
+
+ return triggerResult;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
+ if (shouldClearAtTimeout) {
+ this.clear(window, ctx);
+ }
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long timestamp, W window,
TriggerContext ctx) throws Exception {
+ this.nestedTrigger.onEventTime(timestamp, window, ctx);
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public void clear(W window, TriggerContext ctx) throws Exception {
+ ctx.getPartitionedState(this.timeoutStateDesc).clear();
+ this.nestedTrigger.clear(window, ctx);
+ }
+
+ @Override
+ public String toString() {
+ return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+ }
+
+ public static <T, W extends Window> ProcessingTimeoutTrigger<T, W>
of(Trigger<T, W> nestedTrigger, long interval) {
Review comment:
Both should have Javadoc, because that's what shows up in the IDE when
users try and use it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]