mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469777963
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
"Timer with id %s is not an event time timer!",
newTimer.getTimerId());
if (timerUsesOutputTimestamp(newTimer)) {
- outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+ outputTimestamps.compute(
+ newTimer.getOutputTimestamp().getMillis(),
+ (k, v) -> {
+ Set<String> timerIds = v == null ? new HashSet<>() : v;
+ timerIds.add(getContextTimerId(newTimer.getTimerId(),
newTimer.getNamespace()));
Review comment:
Duplicates are ok. If we have multiple timers set with different ids or
even for the same keys, duplicates are expected. We could use a TreeMap instead
of a PriorityQueue like we do in FlinkTimerInterals for the WatermarkHoldState,
but I don't see anything wrong with the implementation here.
You likely have a problem in your application code, i.e. you set a timer
output timestamp which holds back the watermark but your fire timestamp has not
been reached by the watermark yet. Please investigate that first. Also note
#12531 which was a bug in the Python SDK.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]