Hi, @Aljoscha,
the function param currentTimestamp comment does not match the
recordTimestamp  "long extractTimestamp(T element, long recordTimestamp)"
on wiki.

Best,
Zhangguanghui

Dawid Wysakowicz <dwysakow...@apache.org> 于2020年5月13日周三 上午12:28写道:

> Thank you for the update and sorry again for chiming in so late...
>
> Best,
>
> Dawid
>
>
> On 12/05/2020 18:21, Aljoscha Krettek wrote:
> > Yes, I am also ok with a SerializableTimestampAssigner. This only
> > looks a bit clumsy in the API but as a user (that uses lambdas) you
> > should not see this. I pushed changes for this to my branch:
> > https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
> >
> > And yes, recordTimestamp sounds good for the TimestampAssigner. I
> > admit I didn't read this well enough and only saw nativeTimestamp.
> >
> > Best,
> > Aljoscha
> >
> > On 12.05.20 17:16, Dawid Wysakowicz wrote:
> >> I have similar thoughts to @Stephan
> >>
> >> Ad. 1 I tried something like this on your branch:
> >>
> >>      /**
> >>       * Adds the given {@link TimestampAssigner} to this {@link
> >> WatermarkStrategies}. For top-level classes that implement both
> >> Serializable and TimestampAssigner
> >>       */
> >>      public <TA extends TimestampAssigner<T> & Serializable>
> >> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
> >>          checkNotNull(timestampAssigner, "timestampAssigner");
> >>          this.timestampAssigner = timestampAssigner;
> >>          return this;
> >>      }
> >>
> >>     @FunctionalInterface
> >>      public interface SerializableTimestampAssigner<T> extends
> >> TimestampAssigner<T>, Serializable {
> >>      }
> >>
> >>       /**
> >>        * Adds the given {@link TimestampAssigner} to this {@link
> >> WatermarkStrategies}.
> >>       * Helper method for serializable lambdas.
> >>       */
> >>      public WatermarkStrategies<T>
> >> withTimestampAssigner(SerializableTimestampAssigner<T>
> >> timestampAssigner) {
> >>          checkNotNull(timestampAssigner, "timestampAssigner");
> >>          this.timestampAssigner = timestampAssigner;
> >>          return this;
> >>      }
> >>
> >> But I understand if that's too hacky. It's just a pity that we must
> >> enforce limitations on an interface that are not strictly necessary.
> >>
> >> Ad 2/3
> >>
> >> I am aware the watermark assigner/timestamp extractor can be applied
> >> further down the graph. Originally I also wanted to suggest
> >> sourceTimestamp and SourceTimestampAssigner, but then I realized it can
> >> be used also after the sources as you correctly pointed out. Even if the
> >> TimestampAssigner is used after the source there might be some
> >> native/record timestamp in the StreamRecord, that could've been
> >> extracted by previous assigner.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 12/05/2020 16:47, Stephan Ewen wrote:
> >>> @Aljoscha
> >>>
> >>> About (1) could we have an interface SerializableTimestampAssigner that
> >>> simply mixes in the java.io.Serializable interface? Or will this be too
> >>> clumsy?
> >>>
> >>> About (3) RecordTimeStamp seems to fit both cases (in-source-record
> >>> timestamp, in stream-record timestamp).
> >>>
> >>> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <aljos...@apache.org>
> >>> wrote:
> >>>
> >>>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points
> >>>> 1) and
> >>>> 3).
> >>>>
> >>>> 1) I can see the benefit of that but in reality most timestamp
> >>>> assigners
> >>>> will probably need to be Serializable. If you look at my (updated) POC
> >>>> branch [1] you can see how a TimestampAssigner would be specified
> >>>> on the
> >>>> WatermarkStrategies helper class: [2]. The signature of this would
> >>>> have
> >>>> to be changed to something like:
> >>>>
> >>>> public <TA extends TimestampAssigner<T> & Serializable>
> >>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
> >>>>
> >>>> Then, however, it would not be possible for users to specify a
> >>>> lambda or
> >>>> anonymous inner function for the TimestampAssigner like this:
> >>>>
> >>>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
> >>>>                  .forGenerator(new PeriodicTestWatermarkGenerator())
> >>>>                  .withTimestampAssigner((event, timestamp) -> event)
> >>>>                  .build();
> >>>>
> >>>> 3) This makes sense if we only allow WatermarkStrategies on sources,
> >>>> where the previous timestamp really is the "native" timestamp.
> >>>> Currently, we also allow setting watermark strategies at arbitrary
> >>>> points in the graph. I'm thinking we probably should only allow
> >>>> that in
> >>>> sources but it's not the reality currently. I'm not against
> >>>> renaming it,
> >>>> just voicing those thoughts.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>>
> >>>> [1]
> >>>>
> https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
> >>>> [2]
> >>>>
> >>>>
> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
> >>>>
> >>>>
> >>>> On 12.05.20 15:48, Stephan Ewen wrote:
> >>>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
> >>>>>
> >>>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz
> >>>>> <dwysakow...@apache.org
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Aljoscha,
> >>>>>>
> >>>>>> Sorry for adding comments during the vote, but I have some really
> >>>>>> minor
> >>>>>> suggestions that should not influence the voting thread imo.
> >>>>>>
> >>>>>> 1) Does it make sense to have the TimestampAssigner extend from
> >>>>>> Flink's
> >>>>>> Function? This implies it has to be serializable which with the
> >>>>>> factory
> >>>>>> pattern is not strictly necessary, right? BTW I really like that you
> >>>>>> suggested the FunctionInterface annotation there.
> >>>>>>
> >>>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
> >>>>>>
> >>>>
> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
> >>>>
> >>>>>> Personally I found the IdentityTimestampAssigner a bit misleading
> >>>>>> as it
> >>>>>> usually mean a no-op. Which did not click for me, as I assumed it
> >>>>>> somehow returns the incoming record itself.
> >>>>>>
> >>>>>> 3) Could we rename the second parameter of
> >>>>>> TimestampAssigner#extract to
> >>>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
> >>>>>> above. This parameter was also a bit confusing for me as I
> >>>>>> thought at
> >>>>>> times its somehow related to
> >>>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
> >>>>>> whole system currentTimestamp.
> >>>>>>
> >>>>>> Other than those three points I like the proposal and I was about to
> >>>>>> vote +1 if it was not for those three points.
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Dawid
> >>>>>>
> >>>>>> On 11/05/2020 16:57, Jark Wu wrote:
> >>>>>>> Thanks for the explanation. I like the fatory pattern to make the
> >>>> member
> >>>>>>> variables immutable and final.
> >>>>>>>
> >>>>>>> So +1 to the proposal.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I am fine with that.
> >>>>>>>>
> >>>>>>>> Much of the principles seem agreed upon. I understand the need to
> >>>>>> support
> >>>>>>>> code-generated extractors and we should support most of it
> >>>>>>>> already (as
> >>>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
> >>>>>>>>
> >>>>>>>> I think that the factory approach supports code-generated
> >>>>>>>> extractors
> >>>> in
> >>>>>> a
> >>>>>>>> cleaner way even than an extractor with an open/init method.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek
> >>>>>>>> <aljos...@apache.org
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> We're slightly running out of time. I would propose we vote on
> >>>>>>>>> the
> >>>>>> basic
> >>>>>>>>> principle and remain open to later additions. This feature is
> >>>>>>>>> quite
> >>>>>>>>> important to make the new Kafka Source that is developed as
> >>>>>>>>> part of
> >>>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy
> >>>>>>>>> interfaces
> >>>> in
> >>>>>>>>> the newly added connector.
> >>>>>>>>>
> >>>>>>>>> I know that's a bit unorthodox but would everyone be OK with
> >>>>>>>>> what's
> >>>>>>>>> currently there and then we iterate?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Aljoscha
> >>>>>>>>>
> >>>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
> >>>>>>>>>> Ah, I meant to write this in my previous email, sorry about
> >>>>>>>>>> that.
> >>>>>>>>>>
> >>>>>>>>>> The WatermarkStrategy, which is basically a factory for a
> >>>>>>>>>> WatermarkGenerator is the replacement for the open() method.
> >>>>>>>>>> This is
> >>>>>>>> the
> >>>>>>>>>> same strategy that was followed for StreamOperatorFactory,
> >>>>>>>>>> which was
> >>>>>>>>>> introduced to allow code generation in the Table API [1]. If
> >>>>>>>>>> we need
> >>>>>>>>>> metrics or other things we would add that as a parameter to the
> >>>>>> factory
> >>>>>>>>>> method. What do you think?
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>
> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
> >>>>>>>>>>
> >>>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
> >>>>>>>>>>> Table&SQL to
> >>>>>>>>>>> compile the generated code.
> >>>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is
> >>>>>>>>>>> defined
> >>>>>>>>> using
> >>>>>>>>>>> SQL expressions, we will
> >>>>>>>>>>> translate and generate Java code for the expressions. If we
> >>>>>>>>>>> have
> >>>>>>>>>>> `open()/close()`, we don't need lazy initialization.
> >>>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
> >>>>>>>> current
> >>>>>>>>>>> watermark, the dirty timestamps (null value), etc.
> >>>>>>>>>>> So I think a simple `open()/close()` with a context which
> >>>>>>>>>>> can get
> >>>>>>>>>>> MetricGroup is nice and not complex for the first version.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Jark
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org>
> >>>> wrote:
> >>>>>>>>>>>> Thanks, Aljoscha, for picking this up.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I agree with the approach of doing the here proposed set of
> >>>> changes
> >>>>>>>> for
> >>>>>>>>>>>> now. It already makes things simpler and adds idleness support
> >>>>>>>>>>>> everywhere.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Rich functions and state always add complexity, let's do
> >>>>>>>>>>>> this in a
> >>>>>>>> next
> >>>>>>>>>>>> step, if we have a really compelling case.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> >>>>>>>> aljos...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
> >>>>>> proposal
> >>>>>>>>> is
> >>>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
> >>>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and
> >>>>>>>>>>>>> the WG
> >>>>>>>> can
> >>>>>>>>>>>>> decide whether to output a watermark or not and can also
> >>>>>>>>>>>>> mark the
> >>>>>>>>>>>>> output
> >>>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
> >>>>>>>> previous
> >>>>>>>>>>>>> watermark assigner interface did) would not allow that
> >>>> flexibility.
> >>>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of
> >>>>>>>>>>>>> the
> >>>>>>>> minimum
> >>>>>>>>>>>>> watermark, this would be the responsibility of the
> >>>>>>>>>>>>> framework (or
> >>>>>> the
> >>>>>>>>>>>>> KafkaConsumer in the current implementation). The
> >>>>>>>>>>>>> user-supplied
> >>>> WG
> >>>>>>>>> does
> >>>>>>>>>>>>> not need to make sure the watermark doesn't regress.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
> >>>> potential
> >>>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how
> >>>>>>>>>>>>> should
> >>>>>>>> the
> >>>>>>>>>>>>> watermark state be handled in the case of scale-in? It
> >>>>>>>>>>>>> could be
> >>>>>> made
> >>>>>>>>> to
> >>>>>>>>>>>>> work in the Kafka case by attaching the state to the
> >>>>>>>>>>>>> partition
> >>>>>> state
> >>>>>>>>>>>>> that we keep, but then we have potential backwards
> >>>>>>>>>>>>> compatibility
> >>>>>>>>>>>>> problems also for the WM state. Does the WG usually need
> >>>>>>>>>>>>> to keep
> >>>>>> the
> >>>>>>>>>>>>> state or might it be enough if the state is transient,
> >>>>>>>>>>>>> i.e. if
> >>>> you
> >>>>>>>>> have
> >>>>>>>>>>>>> a restart the WG would loose its histogram but it would
> >>>>>>>>>>>>> rebuild
> >>>> it
> >>>>>>>>>>>>> quickly and you would get back to the same steady state as
> >>>> before.
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it
> >>>>>>>>>>>>>> forward,
> >>>>>>>>>>>>>> Aljoscha.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I also like the idea of making the Watermark generator a
> >>>>>>>>>>>>>> rich
> >>>>>>>>> function
> >>>>>>>>>>>> --
> >>>>>>>>>>>>>> this should make it more straightforward to implement
> >>>>>>>>>>>>>> smarter
> >>>>>>>>>>>>>> watermark
> >>>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics
> >>>>>>>>>>>>>> about the
> >>>>>>>>>>>>>> actual
> >>>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
> >>>>>> variable
> >>>>>>>>>>>>> delay.
> >>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> >>>>>>>> kklou...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for opening the discussion!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have two comments on the FLIP:
> >>>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
> >>>> open()/
> >>>>>>>>>>>>>>> close(), probably with a Context as argument: I have not
> >>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>> thought
> >>>>>>>>>>>>>>> this through but I think that this is more aligned with the
> >>>> rest
> >>>>>>>> of
> >>>>>>>>>>>>>>> our rich functions. In addition, it will allow, for
> >>>>>>>>>>>>>>> example, to
> >>>>>>>>>>>>>>> initialize the Watermark value, if we decide to
> >>>>>>>>>>>>>>> checkpoint the
> >>>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL
> >>>>>>>>>>>>>>> needs to
> >>>> do
> >>>>>>>>>>>>>>> anything in the open()).
> >>>>>>>>>>>>>>> 2) aligned with the above, and with the case where we
> >>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about
> >>>>>>>>>>>>>>> how we
> >>>>>>>> could
> >>>>>>>>>>>>>>> implement this in the future. In the FLIP, it is
> >>>>>>>>>>>>>>> proposed to
> >>>>>>>> expose
> >>>>>>>>>>>>>>> the WatermarkOutput in the methods of the
> >>>>>>>>>>>>>>> WatermarkGenerator.
> >>>>>>>> Given
> >>>>>>>>>>>>>>> that there is the implicit contract that watermarks are
> >>>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will
> >>>>>>>>>>>>>>> have
> >>>> (I
> >>>>>>>>>>>>>>> assume) a check that will compare the last emitted WM
> >>>>>>>>>>>>>>> against
> >>>> the
> >>>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
> >>>> risk
> >>>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
> >>>>>>>> accidentally
> >>>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and
> >>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>> caller do
> >>>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
> >>>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
> >>>>>>>> responsible
> >>>>>>>>>>>>>>> for checkpointing the WM?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Given this, why not having the methods as:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>         Watermark onEvent(T event, long eventTimestamp,
> >>>>>>>>> WatermarkOutput
> >>>>>>>>>>>>>>> output);
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>         Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> and the caller will be the one enforcing any invariants,
> >>>>>>>>>>>>>>> such
> >>>> as
> >>>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
> >>>> checkpoint
> >>>>>>>>>>>>>>> anything that is needed as it will have complete
> >>>>>>>>>>>>>>> knowledge as
> >>>> to
> >>>>>>>> if
> >>>>>>>>>>>>>>> the WM was emitted or not.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Kostas
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
> >>>> twal...@apache.org
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> >>>>>>>>> unification.
> >>>>>>>>>>>> We
> >>>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
> >>>> FLIP-95
> >>>>>>>> [1]
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> look forward to update to the new unified watermark
> >>>>>>>>>>>>>>>> generators
> >>>>>>>> once
> >>>>>>>>>>>>>>>> FLIP-126 has been accepted.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>>>>>>>>>>> Hi Everyone!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126:
> >>>>>>>>>>>>>>>>> Unify (and
> >>>>>>>>>>>> separate)
> >>>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by
> >>>>>>>>>>>>>>>>> Stephan in
> >>>>>> an
> >>>>>>>>>>>>>>>>> experimental branch. I expanded on that work to
> >>>>>>>>>>>>>>>>> provide a PoC
> >>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that
> >>>>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to
> >>>>>>>>>>>>>>>>> support
> >>>>>>>> two
> >>>>>>>>>>>>>>>>> separate interfaces, we have two operator
> >>>>>>>>>>>>>>>>> implementations for
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> different flavours. Also, this makes features such as
> >>>>>>>>>>>>>>>>> generic
> >>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
> >>>> because
> >>>>>>>> we
> >>>>>>>>>>>>> again
> >>>>>>>>>>>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In this FLIP we propose two things:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
> >>>>>>>>> WatermarkGenerator
> >>>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>>>>>>>>>>> The motivation for the first is to simplify future
> >>>>>>>> implementations
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
> >>>> again
> >>>>>>>>> code
> >>>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend
> >>>>>>>>>>>>>>>>> from
> >>>>>> some
> >>>>>>>>>>>> base
> >>>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
> >>>> users
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
> >>>>>> provide
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> timestamp extraction logic.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
> >>>>>>>>>>>> WatermarkGenerator
> >>>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
> >>>>>>>>>>>> stream/partition
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that
> >>>>>>>>>>>>>>>>> we want
> >>>>>> to
> >>>>>>>>>>>> unify
> >>>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time
> >>>>>>>>>>>>>>>>> split
> >>>>>> the
> >>>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking
> >>>>>>>>>>>>>>>>> forward to
> >>>>>>>>>>>>>>>>> your feedback.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>
> >>>>>>>>>>>>>>>>> [2]
> >>>> https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>
> >>>>
> >>
> >
>
>

Reply via email to