Thanks Luke. I will look into @RequiresTimeSortedInput for the sorting
requirement. In parallel, I will start on the monotonically increasing
watermark estimator and come back if I have any questions. Have a great day.

On Mon, Sep 21, 2020 at 8:48 PM Luke Cwik <[email protected]> wrote:

>
>
> On Mon, Sep 21, 2020 at 5:02 PM Praveen K Viswanathan <
> [email protected]> wrote:
>
>> Hi Luke, Thanks for the detailed explanation. This gives more insight to
>> new people like me trying to grok the whole concept.
>>
>> *1) What timestamp your going to output your records at*
>>
>> ** use upstream input elements timestamp: guidance use the default
>> implementation and to get the upstream watermark by default*
>> ** use data from within the record being output or external system state
>> via an API call: use a watermark estimator*
>>
>> In the above section, I do not think our source has a watermark concept
>> built-in to derive and use it in SDF so we will have to go with the second
>> option. If suppose we could extract a timestamp from the source message
>> then do we have to setWatermark with that extracted timestamp before each
>> output in @ProcessElement? And can we use the Manual Watermark Estimator
>> itself for this approach?
>>
>
> You want to use context.outputWithTimestamp when parsing your own
> timestamps and emitting records.
>
> Using the manual one works but also take a look at timestamp observing
> works since it will be told the timestamp of each element being produced.
> Using the timestamp observing ones (monotonically increasing or your own)
> allows you to decouple the watermark estimator logic from the SDF
> implementation.
>
>
>>
>>
>> *2) How you want to compute the watermark estimate (if at all)*
>> ** the choice here depends on how the elements timestamps progress, are
>> they in exactly sorted order, almost sorted order, completely unsorted,
>> ...?*
>>
>> Our elements are "almost sorted order" because of which we want to hold
>> off processing message_01 with timestamp 11:00:10 AM until we process
>> message-02 with timestamp 11:00:08 AM. How do we enable this ordering while
>> processing the messages?
>>
>> Based on your suggestion, I tried WallTime Estimator and it worked for
>> one of our many scenarios. I am planning to test it with a bunch of other
>> window types and use that till we get a solid hold on doing it in the above
>> mentioned way that can handle the unsorted messages.
>>
>
> If you're extracting the timestamps out of your data, it would likely be
> best to use the monotonically increasing timestamp estimator or write one
> that computes one using some statistical method appropriate to your source.
> If you think you have written one that is generally useful, feel free to
> contribute it to Beam.
>
> You'll want to look into @RequiresTimeSortedInput[1]. This allows you to
> produce the messages in any order and requires the runner to make sure they
> are sorted before passing to a downstream stateful DoFn.
>
> 1:
> https://lists.apache.org/thread.html/9cdac2a363e18be58fa1f14c838c61e8406ae3407e4e2d05e423234c%40%3Cdev.beam.apache.org%3E
>
>
>>
>> Regards,
>> Praveen
>>
>> On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <[email protected]> wrote:
>>
>>> To answer your specific question, you should create and return the
>>> WallTime estimator. You shouldn't need to interact with it from within
>>> your @ProcessElement call since your elements are using the current time
>>> for their timestamp.
>>>
>>> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> Kafka is a complex example because it is adapting code from before
>>>> there was an SDF implementation (namely the TimestampPolicy and the
>>>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>>>>
>>>> There are three types of watermark estimators that are in the Beam Java
>>>> SDK today:
>>>> Manual: Can be invoked from within your @ProcessElement method within
>>>> your SDF allowing you precise control over what the watermark is.
>>>> WallTime: Doesn't need to be interacted with, will report the current
>>>> time as the watermark time. Once it is instantiated and returned via the
>>>> @NewWatermarkEstimator method you don't need to do anything with it. This
>>>> is functionally equivalent to calling setWatermark(Instant.now()) right
>>>> before returning from the @ProcessElement method in the SplittableDoFn on a
>>>> Manual watermark.
>>>> TimestampObserving: Is invoked using the output timestamp for every
>>>> element that is output. This is functionally equivalent to calling
>>>> setWatermark after each output within your @ProcessElement method in the
>>>> SplittableDoFn. The MonotonicallyIncreasing implementation for
>>>> the TimestampObserving estimator ensures that the largest timestamp seen so
>>>> far will be reported for the watermark.
>>>>
>>>> The default is to not set any watermark estimate.
>>>>
>>>> For all watermark estimators you're allowed to set the watermark
>>>> estimate to anything as the runner will recompute the output watermark as:
>>>> new output watermark = max(previous output watermark, min(upstream
>>>> watermark, watermark estimates))
>>>> This effectively means that the watermark will never go backwards from
>>>> the runners point of view but that does mean that setting the watermark
>>>> estimate below the previous output watermark (which isn't observable) will
>>>> not do anything beyond holding the watermark at the previous output
>>>> watermark.
>>>>
>>>> Depending on the windowing strategy and allowed lateness, any records
>>>> that are output with a timestamp that is too early can be considered
>>>> droppably late, otherwise they will be late/ontime/early.
>>>>
>>>> So as an author for an SDF transform, you need to figure out:
>>>> 1) What timestamp your going to output your records at
>>>> * use upstream input elements timestamp: guidance use the default
>>>> implementation and to get the upstream watermark by default
>>>> * use data from within the record being output or external system state
>>>> via an API call: use a watermark estimator
>>>> 2) How you want to compute the watermark estimate (if at all)
>>>> * the choice here depends on how the elements timestamps progress, are
>>>> they in exactly sorted order, almost sorted order, completely unsorted, 
>>>> ...?
>>>>
>>>> For both of these it is upto you to choose how much flexibility in
>>>> these decisions you want to give to your users and that should guide what
>>>> you expose within the API (like how KafkaIO exposes a TimestampPolicy) or
>>>> how many other sources don't expose anything.
>>>>
>>>>
>>>> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> I am also looking at the `WatermarkEstimators.manual` option, in
>>>>> parallel. Now we are getting data past our Fixed Window but the 
>>>>> aggregation
>>>>> is not as expected.  The doc says setWatermark will "set timestamp
>>>>> before or at the timestamps of all future elements produced by the
>>>>> associated DoFn". If I output with a timestamp as below then could
>>>>> you please clarify on how we should set the watermark for this manual
>>>>> watermark estimator?
>>>>>
>>>>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>>>>
>>>>> Thanks,
>>>>> Praveen
>>>>>
>>>>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> Is the watermark advancing[1, 2] for the SDF such that the windows
>>>>>> can close allowing for the Count transform to produce output?
>>>>>>
>>>>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>>>>
>>>>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi everyone!
>>>>>>>
>>>>>>> We are developing a new IO connector using the SDF API, and testing
>>>>>>> it with the following simple counting pipeline:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> p.apply(MyIO.read()
>>>>>>>
>>>>>>>         .withStream(inputStream)
>>>>>>>
>>>>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>>>>
>>>>>>>         .withConsumerConfig(config)
>>>>>>>
>>>>>>>     ) // gets a PCollection<KV<String, String>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>>>
>>>>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>>>>
>>>>>>>     .accumulatingFiredPanes())
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> .apply(Count.<String>perElement())
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> // write PCollection<KV<String, Long>> to stream
>>>>>>>
>>>>>>> .apply(MyIO.write()
>>>>>>>
>>>>>>>         .withStream(outputStream)
>>>>>>>
>>>>>>>         .withConsumerConfig(config));
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Without the window transform, we can read from the stream and write
>>>>>>> to it, however, I don’t see output after the Window transform. Could you
>>>>>>> please help pin down the issue?
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Gaurav
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Praveen K Viswanathan
>>>>>
>>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Reply via email to