sunhaibotb commented on a change in pull request #10151: [FLINK-14231] Handle
the processing-time timers when closing operators to make endInput semantics on
the operator chain strict
URL: https://github.com/apache/flink/pull/10151#discussion_r356000446
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
##########
@@ -18,18 +18,57 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.NeverCompleteFuture;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Function;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
class ProcessingTimeServiceImpl implements ProcessingTimeService {
+
+ private static final int STATUS_ALIVE = 0;
+ private static final int STATUS_QUIESCED = 1;
+
+ //
------------------------------------------------------------------------
+
private final TimerService timerService;
+
private final Function<ProcessingTimeCallback, ProcessingTimeCallback>
processingTimeCallbackWrapper;
+ private final ConcurrentHashMap<TimerScheduledFuture<?>, Object>
undoneTimers;
Review comment:
After closing the operator, we need to actively cancel the timers that are
not in executing (please look at
`ProcessingTimeServiceImpl#cancelTimersGracefullyAfterQuiesce`), so we need to
store these undone timers. If we don't, we need to wait until the timer is
fired, which may take a long time (we don't know how long it will be). For
example, the timer registered in `StreamingFileSink` is fired per 60s.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services