On 10.09.20 11:30, Dawid Wysakowicz wrote:
I am not sure about the option for ignoring the Triggers. Do you mean to
ignore all the Triggers including e.g. Flink's such as CountTrigger,
EventTimeTrigger etc.? Won't it effectively disable the WindowOperator
whatsoever. Or even worse make it unusable with ever growing state? I
might be wrong here but aren't Triggers required for emitting results
from WindowOperator? If I am correct we emit results only if a Trigger
returns FIRE from on of onElement, onEventTime, onProcessingTime. Why do
you think it does not work well with FAILing hard without this option?
We could fail hard e.g. if the WindowAssigner#isEventTime returns false.

The problem I'm trying to solve are mixed Triggers. Say you have a Trigger that does "fire when watermark passes maxTimestamp() but also fire every 5 minutes in processing time and when the watermark passes maxTimestamp() fire for every 5 new records". This is something that the Beam API for example allows users to specify and is something that I think is potentially valuable in the real world.

Ignoring Triggers would mean that we always fire on the maxTimestamp() by hardcoding this in a WindowOperator that we use for BATCH execution. With this, the WindowAssigner becomes the only thing that changes. This is similar to how Beam treats windows, where the WindowAssigner carries semantic content but the Trigger is only for optimizing streaming emission, which you don't need for BATCH where you always have a "perfect watermark".

Coming back to the initial example, such a Trigger would not work if we FAIL hard for processing-time on BATCH, which I'm suggesting because we otherwise have potentially surprising results if business logic depends on processing-time timers. For Windows, on the other hand, we could get around it by agreeing that Triggers are ignored for BATCH.

As for the question with getProcessingTime(). From my point of view, it
would be safe to simply return the current system time. I cannot think
of any dangers if we do so. Moreover, frankly speaking I am not entirely
sure what is the purpose of the method, other than injecting a clock in
tests of built-in operators. Maybe it was a mistake to expose it in the
user's API?

I agree, it was a mistake to expose getProcessingTime(). And I also think the same about getCurrentWatermark(), but that's neither here nor there. 😅 I then also agree to just return the current time, as you said. I will change the FLIP for this.

Aljoscha

Reply via email to