+1 for the second approach. Regarding Stephan's comment, I think it would be better to have dedicated WindowAssigner classes. Otherwise, this becomes inconsistent with the dedicated Event/ProcessingTimeTriggers.
2015-12-18 12:03 GMT+01:00 Stephan Ewen <se...@apache.org>: > I am also in favor of option (2). > > We could also pass the TimeCharacteristic to for example the > "SlidingTimeWindows". Then there is one class, users can explicitly choose > the characteristic of choice, and when nothing is specified, the > default time characteristic is chosen. > > On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <m...@apache.org> > wrote: > > > Hi Aljoscha, > > > > I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set > > the default time behavior. Then add a method to the operators to set a > > custom time behavior. > > > > The problem explanatory in SlidingTimeWindows: > > > > @Override > > public Trigger<Object, TimeWindow> > > getDefaultTrigger(StreamExecutionEnvironment env) { > > if (env.getStreamTimeCharacteristic() == > > TimeCharacteristic.ProcessingTime) { > > return ProcessingTimeTrigger.create(); > > } else { > > return EventTimeTrigger.create(); > > } > > } > > > > That just needs to be fixed, e.g. by having a dedicated > > setTimeCharacteristic(..) on the operator. > > > > +1 for removing AbstractTime, EvenTime, and ProcessingTime. > > > > Cheers, > > Max > > > > On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > Hi, > > > I thought a bit about how to improve the handling of time in Flink, > > mostly as it relates to windows. The problem is that mixing > processing-time > > and event-time windows in one topology is very hard (impossible) right > now. > > Let my explain it with this example: > > > > > > val env: StreamExecutionEnvironment = … > > > > > > env.setStreamTimeCharacteristic(EventTime) > > > > > > val input = <some stream> > > > > > > val quickResults = input > > > .keyBy(…) > > > .window(TumblingTimeWindows.of(Time.seconds(5)) > > > .trigger(ProcessingTimeTrigger.create()) > > > .sum(1) > > > > > > val slowResults = input > > > .keyBy(…) > > > .window(TumblingTimeWindows.of(Time.seconds(5)) > > > // .trigger(EventTimeTrigger.create()) this is the default trigger, > so > > no need to set it, really > > > .sum(1) > > > > > > The idea is that you want to have fast, but possibly inaccurate, > results > > using processing time and correct, but maybe slower, results using > > event-time windowing. > > > > > > The problem is that the current API tries to solve two problems: > > > 1. We want to have a way to just say “time window” and then let the > > system instantiate the correct window-operator based on the time > > characteristic > > > 2. We want to have flexibility to allow users to mix ’n match > > processing-time and event-time windows > > > > > > The above example does not work because both operators would assign > > elements to windows based on the event-time timestamp. The first window > > therefore triggers event-time windows by processing time, which has > > unexpected (wrong) results. > > > > > > I see three solutions to this: > > > 1. Remove setStreamTimeCharacteristic(), let users always specify > > exactly what kind of windowing they want > > > 2. Keep setStreamTimeCharacteristic() but only employ the magic that > > decides on the window operator for the special .timeWindow() call. Have > two > > different window assigners (two per window type, that is TumblingWindows, > > SlidingTimeWindows, SessionWindows, ...), one for processing-time and one > > for event-time that allow users to accurately specify what they want > > > 3. Keep setStreamTimeCharacteristic() and have three window assigners > > per window type, one for processing-time, one for event-time and one that > > automatically decides based on the time characteristic > > > > > > What do you think? > > > > > > On a side note, I would also suggest to remove AbstractTime, EventTime, > > and ProcessingTime and just keep Time for specifying time. > > > > > > Cheers, > > > Aljoscha > > >