Hi Luke, Thanks for the detailed explanation. This gives more insight to new people like me trying to grok the whole concept.
*1) What timestamp your going to output your records at* ** use upstream input elements timestamp: guidance use the default implementation and to get the upstream watermark by default* ** use data from within the record being output or external system state via an API call: use a watermark estimator* In the above section, I do not think our source has a watermark concept built-in to derive and use it in SDF so we will have to go with the second option. If suppose we could extract a timestamp from the source message then do we have to setWatermark with that extracted timestamp before each output in @ProcessElement? And can we use the Manual Watermark Estimator itself for this approach? *2) How you want to compute the watermark estimate (if at all)* ** the choice here depends on how the elements timestamps progress, are they in exactly sorted order, almost sorted order, completely unsorted, ...?* Our elements are "almost sorted order" because of which we want to hold off processing message_01 with timestamp 11:00:10 AM until we process message-02 with timestamp 11:00:08 AM. How do we enable this ordering while processing the messages? Based on your suggestion, I tried WallTime Estimator and it worked for one of our many scenarios. I am planning to test it with a bunch of other window types and use that till we get a solid hold on doing it in the above mentioned way that can handle the unsorted messages. Regards, Praveen On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <[email protected]> wrote: > To answer your specific question, you should create and return the > WallTime estimator. You shouldn't need to interact with it from within > your @ProcessElement call since your elements are using the current time > for their timestamp. > > On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <[email protected]> wrote: > >> Kafka is a complex example because it is adapting code from before there >> was an SDF implementation (namely the TimestampPolicy and the >> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions). >> >> There are three types of watermark estimators that are in the Beam Java >> SDK today: >> Manual: Can be invoked from within your @ProcessElement method within >> your SDF allowing you precise control over what the watermark is. >> WallTime: Doesn't need to be interacted with, will report the current >> time as the watermark time. Once it is instantiated and returned via the >> @NewWatermarkEstimator method you don't need to do anything with it. This >> is functionally equivalent to calling setWatermark(Instant.now()) right >> before returning from the @ProcessElement method in the SplittableDoFn on a >> Manual watermark. >> TimestampObserving: Is invoked using the output timestamp for every >> element that is output. This is functionally equivalent to calling >> setWatermark after each output within your @ProcessElement method in the >> SplittableDoFn. The MonotonicallyIncreasing implementation for >> the TimestampObserving estimator ensures that the largest timestamp seen so >> far will be reported for the watermark. >> >> The default is to not set any watermark estimate. >> >> For all watermark estimators you're allowed to set the watermark estimate >> to anything as the runner will recompute the output watermark as: >> new output watermark = max(previous output watermark, min(upstream >> watermark, watermark estimates)) >> This effectively means that the watermark will never go backwards from >> the runners point of view but that does mean that setting the watermark >> estimate below the previous output watermark (which isn't observable) will >> not do anything beyond holding the watermark at the previous output >> watermark. >> >> Depending on the windowing strategy and allowed lateness, any records >> that are output with a timestamp that is too early can be considered >> droppably late, otherwise they will be late/ontime/early. >> >> So as an author for an SDF transform, you need to figure out: >> 1) What timestamp your going to output your records at >> * use upstream input elements timestamp: guidance use the default >> implementation and to get the upstream watermark by default >> * use data from within the record being output or external system state >> via an API call: use a watermark estimator >> 2) How you want to compute the watermark estimate (if at all) >> * the choice here depends on how the elements timestamps progress, are >> they in exactly sorted order, almost sorted order, completely unsorted, ...? >> >> For both of these it is upto you to choose how much flexibility in these >> decisions you want to give to your users and that should guide what you >> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how >> many other sources don't expose anything. >> >> >> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan < >> [email protected]> wrote: >> >>> Hi Luke, >>> >>> I am also looking at the `WatermarkEstimators.manual` option, in >>> parallel. Now we are getting data past our Fixed Window but the aggregation >>> is not as expected. The doc says setWatermark will "set timestamp >>> before or at the timestamps of all future elements produced by the >>> associated DoFn". If I output with a timestamp as below then could you >>> please clarify on how we should set the watermark for this manual >>> watermark estimator? >>> >>> receiver.outputWithTimestamp(ossRecord, Instant.now()); >>> >>> Thanks, >>> Praveen >>> >>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <[email protected]> wrote: >>> >>>> Is the watermark advancing[1, 2] for the SDF such that the windows can >>>> close allowing for the Count transform to produce output? >>>> >>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4 >>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing >>>> >>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <[email protected]> >>>> wrote: >>>> >>>>> Hi everyone! >>>>> >>>>> We are developing a new IO connector using the SDF API, and testing it >>>>> with the following simple counting pipeline: >>>>> >>>>> >>>>> >>>>> p.apply(MyIO.read() >>>>> >>>>> .withStream(inputStream) >>>>> >>>>> .withStreamPartitions(Arrays.asList(0)) >>>>> >>>>> .withConsumerConfig(config) >>>>> >>>>> ) // gets a PCollection<KV<String, String>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> .apply(Values.<String>*create*()) // PCollection<String> >>>>> >>>>> >>>>> >>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) >>>>> >>>>> .withAllowedLateness(Duration.standardDays(1)) >>>>> >>>>> .accumulatingFiredPanes()) >>>>> >>>>> >>>>> >>>>> .apply(Count.<String>perElement()) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> // write PCollection<KV<String, Long>> to stream >>>>> >>>>> .apply(MyIO.write() >>>>> >>>>> .withStream(outputStream) >>>>> >>>>> .withConsumerConfig(config)); >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Without the window transform, we can read from the stream and write to >>>>> it, however, I don’t see output after the Window transform. Could you >>>>> please help pin down the issue? >>>>> >>>>> Thank you, >>>>> >>>>> Gaurav >>>>> >>>> >>> >>> -- >>> Thanks, >>> Praveen K Viswanathan >>> >> -- Thanks, Praveen K Viswanathan
