+1 for the second approach.

Regarding Stephan's comment, I think it would be better to have dedicated
WindowAssigner classes. Otherwise, this becomes inconsistent with the
dedicated Event/ProcessingTimeTriggers.

2015-12-18 12:03 GMT+01:00 Stephan Ewen <se...@apache.org>:

> I am also in favor of option (2).
>
> We could also pass the TimeCharacteristic to for example the
> "SlidingTimeWindows". Then there is one class, users can explicitly choose
> the characteristic of choice, and when nothing is specified, the
> default time characteristic is chosen.
>
> On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
> > Hi Aljoscha,
> >
> > I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
> > the default time behavior. Then add a method to the operators to set a
> > custom time behavior.
> >
> > The problem explanatory in SlidingTimeWindows:
> >
> > @Override
> > public Trigger<Object, TimeWindow>
> > getDefaultTrigger(StreamExecutionEnvironment env) {
> >    if (env.getStreamTimeCharacteristic() ==
> > TimeCharacteristic.ProcessingTime) {
> >       return ProcessingTimeTrigger.create();
> >    } else {
> >       return EventTimeTrigger.create();
> >    }
> > }
> >
> > That just needs to be fixed, e.g. by having a dedicated
> > setTimeCharacteristic(..) on the operator.
> >
> > +1 for removing AbstractTime, EvenTime, and ProcessingTime.
> >
> > Cheers,
> > Max
> >
> > On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> > > Hi,
> > > I thought a bit about how to improve the handling of time in Flink,
> > mostly as it relates to windows. The problem is that mixing
> processing-time
> > and event-time windows in one topology is very hard (impossible) right
> now.
> > Let my explain it with this example:
> > >
> > > val env: StreamExecutionEnvironment = …
> > >
> > > env.setStreamTimeCharacteristic(EventTime)
> > >
> > > val input = <some stream>
> > >
> > > val quickResults = input
> > >   .keyBy(…)
> > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > >   .trigger(ProcessingTimeTrigger.create())
> > >   .sum(1)
> > >
> > > val slowResults = input
> > >   .keyBy(…)
> > >   .window(TumblingTimeWindows.of(Time.seconds(5))
> > >   // .trigger(EventTimeTrigger.create()) this is the default trigger,
> so
> > no need to set it, really
> > >   .sum(1)
> > >
> > > The idea is that you want to have fast, but possibly inaccurate,
> results
> > using processing time and correct, but maybe slower, results using
> > event-time windowing.
> > >
> > > The problem is that the current API tries to solve two problems:
> > >  1. We want to have a way to just say “time window” and then let the
> > system instantiate the correct window-operator based on the time
> > characteristic
> > >  2. We want to have flexibility to allow users to mix ’n match
> > processing-time and event-time windows
> > >
> > > The above example does not work because both operators would assign
> > elements to windows based on the event-time timestamp. The first window
> > therefore triggers event-time windows by processing time, which has
> > unexpected (wrong) results.
> > >
> > > I see three solutions to this:
> > >  1. Remove setStreamTimeCharacteristic(), let users always specify
> > exactly what kind of windowing they want
> > >  2. Keep setStreamTimeCharacteristic() but only employ the magic that
> > decides on the window operator for the special .timeWindow() call. Have
> two
> > different window assigners (two per window type, that is TumblingWindows,
> > SlidingTimeWindows, SessionWindows, ...), one for processing-time and one
> > for event-time that allow users to accurately specify what they want
> > >  3. Keep setStreamTimeCharacteristic() and have three window assigners
> > per window type, one for processing-time, one for event-time and one that
> > automatically decides based on the time characteristic
> > >
> > > What do you think?
> > >
> > > On a side note, I would also suggest to remove AbstractTime, EventTime,
> > and ProcessingTime and just keep Time for specifying time.
> > >
> > > Cheers,
> > > Aljoscha
> >
>

Reply via email to