Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6062#discussion_r191412983
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
@@ -199,17 +186,9 @@ public long currentWatermark() {
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
- InternalTimer<K, N> timer = new InternalTimer<>(time, (K)
keyContext.getCurrentKey(), namespace);
-
- // make sure we only put one timer per key into the queue
- Set<InternalTimer<K, N>> timerSet =
getProcessingTimeTimerSetForTimer(timer);
- if (timerSet.add(timer)) {
-
- InternalTimer<K, N> oldHead =
processingTimeTimersQueue.peek();
+ InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
+ if (processingTimeTimersQueue.scheduleTimer(time, (K)
keyContext.getCurrentKey(), namespace)) {
--- End diff --
I know this is out of scope, but I think we could get rid of the
`KeyContext` by passing the current key to the `registerProcessingTimeTimer`
method. Moreover, instead of calling `KeyContext#setCurrentKey` we could pass
the key value to the `Triggerable#onEvent/ProcessingTime` method. Triggering
side effects via the `KeyContext` before calling certain methods is imo very
brittle.
---