Thanks Luke. I will look into @RequiresTimeSortedInput for the sorting requirement. In parallel, I will start on the monotonically increasing watermark estimator and come back if I have any questions. Have a great day.
On Mon, Sep 21, 2020 at 8:48 PM Luke Cwik <[email protected]> wrote: > > > On Mon, Sep 21, 2020 at 5:02 PM Praveen K Viswanathan < > [email protected]> wrote: > >> Hi Luke, Thanks for the detailed explanation. This gives more insight to >> new people like me trying to grok the whole concept. >> >> *1) What timestamp your going to output your records at* >> >> ** use upstream input elements timestamp: guidance use the default >> implementation and to get the upstream watermark by default* >> ** use data from within the record being output or external system state >> via an API call: use a watermark estimator* >> >> In the above section, I do not think our source has a watermark concept >> built-in to derive and use it in SDF so we will have to go with the second >> option. If suppose we could extract a timestamp from the source message >> then do we have to setWatermark with that extracted timestamp before each >> output in @ProcessElement? And can we use the Manual Watermark Estimator >> itself for this approach? >> > > You want to use context.outputWithTimestamp when parsing your own > timestamps and emitting records. > > Using the manual one works but also take a look at timestamp observing > works since it will be told the timestamp of each element being produced. > Using the timestamp observing ones (monotonically increasing or your own) > allows you to decouple the watermark estimator logic from the SDF > implementation. > > >> >> >> *2) How you want to compute the watermark estimate (if at all)* >> ** the choice here depends on how the elements timestamps progress, are >> they in exactly sorted order, almost sorted order, completely unsorted, >> ...?* >> >> Our elements are "almost sorted order" because of which we want to hold >> off processing message_01 with timestamp 11:00:10 AM until we process >> message-02 with timestamp 11:00:08 AM. How do we enable this ordering while >> processing the messages? >> >> Based on your suggestion, I tried WallTime Estimator and it worked for >> one of our many scenarios. I am planning to test it with a bunch of other >> window types and use that till we get a solid hold on doing it in the above >> mentioned way that can handle the unsorted messages. >> > > If you're extracting the timestamps out of your data, it would likely be > best to use the monotonically increasing timestamp estimator or write one > that computes one using some statistical method appropriate to your source. > If you think you have written one that is generally useful, feel free to > contribute it to Beam. > > You'll want to look into @RequiresTimeSortedInput[1]. This allows you to > produce the messages in any order and requires the runner to make sure they > are sorted before passing to a downstream stateful DoFn. > > 1: > https://lists.apache.org/thread.html/9cdac2a363e18be58fa1f14c838c61e8406ae3407e4e2d05e423234c%40%3Cdev.beam.apache.org%3E > > >> >> Regards, >> Praveen >> >> On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <[email protected]> wrote: >> >>> To answer your specific question, you should create and return the >>> WallTime estimator. You shouldn't need to interact with it from within >>> your @ProcessElement call since your elements are using the current time >>> for their timestamp. >>> >>> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <[email protected]> wrote: >>> >>>> Kafka is a complex example because it is adapting code from before >>>> there was an SDF implementation (namely the TimestampPolicy and the >>>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions). >>>> >>>> There are three types of watermark estimators that are in the Beam Java >>>> SDK today: >>>> Manual: Can be invoked from within your @ProcessElement method within >>>> your SDF allowing you precise control over what the watermark is. >>>> WallTime: Doesn't need to be interacted with, will report the current >>>> time as the watermark time. Once it is instantiated and returned via the >>>> @NewWatermarkEstimator method you don't need to do anything with it. This >>>> is functionally equivalent to calling setWatermark(Instant.now()) right >>>> before returning from the @ProcessElement method in the SplittableDoFn on a >>>> Manual watermark. >>>> TimestampObserving: Is invoked using the output timestamp for every >>>> element that is output. This is functionally equivalent to calling >>>> setWatermark after each output within your @ProcessElement method in the >>>> SplittableDoFn. The MonotonicallyIncreasing implementation for >>>> the TimestampObserving estimator ensures that the largest timestamp seen so >>>> far will be reported for the watermark. >>>> >>>> The default is to not set any watermark estimate. >>>> >>>> For all watermark estimators you're allowed to set the watermark >>>> estimate to anything as the runner will recompute the output watermark as: >>>> new output watermark = max(previous output watermark, min(upstream >>>> watermark, watermark estimates)) >>>> This effectively means that the watermark will never go backwards from >>>> the runners point of view but that does mean that setting the watermark >>>> estimate below the previous output watermark (which isn't observable) will >>>> not do anything beyond holding the watermark at the previous output >>>> watermark. >>>> >>>> Depending on the windowing strategy and allowed lateness, any records >>>> that are output with a timestamp that is too early can be considered >>>> droppably late, otherwise they will be late/ontime/early. >>>> >>>> So as an author for an SDF transform, you need to figure out: >>>> 1) What timestamp your going to output your records at >>>> * use upstream input elements timestamp: guidance use the default >>>> implementation and to get the upstream watermark by default >>>> * use data from within the record being output or external system state >>>> via an API call: use a watermark estimator >>>> 2) How you want to compute the watermark estimate (if at all) >>>> * the choice here depends on how the elements timestamps progress, are >>>> they in exactly sorted order, almost sorted order, completely unsorted, >>>> ...? >>>> >>>> For both of these it is upto you to choose how much flexibility in >>>> these decisions you want to give to your users and that should guide what >>>> you expose within the API (like how KafkaIO exposes a TimestampPolicy) or >>>> how many other sources don't expose anything. >>>> >>>> >>>> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan < >>>> [email protected]> wrote: >>>> >>>>> Hi Luke, >>>>> >>>>> I am also looking at the `WatermarkEstimators.manual` option, in >>>>> parallel. Now we are getting data past our Fixed Window but the >>>>> aggregation >>>>> is not as expected. The doc says setWatermark will "set timestamp >>>>> before or at the timestamps of all future elements produced by the >>>>> associated DoFn". If I output with a timestamp as below then could >>>>> you please clarify on how we should set the watermark for this manual >>>>> watermark estimator? >>>>> >>>>> receiver.outputWithTimestamp(ossRecord, Instant.now()); >>>>> >>>>> Thanks, >>>>> Praveen >>>>> >>>>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> Is the watermark advancing[1, 2] for the SDF such that the windows >>>>>> can close allowing for the Count transform to produce output? >>>>>> >>>>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4 >>>>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing >>>>>> >>>>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi everyone! >>>>>>> >>>>>>> We are developing a new IO connector using the SDF API, and testing >>>>>>> it with the following simple counting pipeline: >>>>>>> >>>>>>> >>>>>>> >>>>>>> p.apply(MyIO.read() >>>>>>> >>>>>>> .withStream(inputStream) >>>>>>> >>>>>>> .withStreamPartitions(Arrays.asList(0)) >>>>>>> >>>>>>> .withConsumerConfig(config) >>>>>>> >>>>>>> ) // gets a PCollection<KV<String, String>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> .apply(Values.<String>*create*()) // PCollection<String> >>>>>>> >>>>>>> >>>>>>> >>>>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) >>>>>>> >>>>>>> .withAllowedLateness(Duration.standardDays(1)) >>>>>>> >>>>>>> .accumulatingFiredPanes()) >>>>>>> >>>>>>> >>>>>>> >>>>>>> .apply(Count.<String>perElement()) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> // write PCollection<KV<String, Long>> to stream >>>>>>> >>>>>>> .apply(MyIO.write() >>>>>>> >>>>>>> .withStream(outputStream) >>>>>>> >>>>>>> .withConsumerConfig(config)); >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Without the window transform, we can read from the stream and write >>>>>>> to it, however, I don’t see output after the Window transform. Could you >>>>>>> please help pin down the issue? >>>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Gaurav >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Praveen K Viswanathan >>>>> >>>> >> >> -- >> Thanks, >> Praveen K Viswanathan >> > -- Thanks, Praveen K Viswanathan
