Hi Luke, Thanks for the detailed explanation. This gives more insight to
new people like me trying to grok the whole concept.

*1) What timestamp your going to output your records at*

** use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default*
** use data from within the record being output or external system state
via an API call: use a watermark estimator*

In the above section, I do not think our source has a watermark concept
built-in to derive and use it in SDF so we will have to go with the second
option. If suppose we could extract a timestamp from the source message
then do we have to setWatermark with that extracted timestamp before each
output in @ProcessElement? And can we use the Manual Watermark Estimator
itself for this approach?


*2) How you want to compute the watermark estimate (if at all)*
** the choice here depends on how the elements timestamps progress, are
they in exactly sorted order, almost sorted order, completely unsorted,
...?*

Our elements are "almost sorted order" because of which we want to hold off
processing message_01 with timestamp 11:00:10 AM until we process
message-02 with timestamp 11:00:08 AM. How do we enable this ordering while
processing the messages?

Based on your suggestion, I tried WallTime Estimator and it worked for one
of our many scenarios. I am planning to test it with a bunch of other
window types and use that till we get a solid hold on doing it in the above
mentioned way that can handle the unsorted messages.

Regards,
Praveen

On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <[email protected]> wrote:

> To answer your specific question, you should create and return the
> WallTime estimator. You shouldn't need to interact with it from within
> your @ProcessElement call since your elements are using the current time
> for their timestamp.
>
> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <[email protected]> wrote:
>
>> Kafka is a complex example because it is adapting code from before there
>> was an SDF implementation (namely the TimestampPolicy and the
>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>>
>> There are three types of watermark estimators that are in the Beam Java
>> SDK today:
>> Manual: Can be invoked from within your @ProcessElement method within
>> your SDF allowing you precise control over what the watermark is.
>> WallTime: Doesn't need to be interacted with, will report the current
>> time as the watermark time. Once it is instantiated and returned via the
>> @NewWatermarkEstimator method you don't need to do anything with it. This
>> is functionally equivalent to calling setWatermark(Instant.now()) right
>> before returning from the @ProcessElement method in the SplittableDoFn on a
>> Manual watermark.
>> TimestampObserving: Is invoked using the output timestamp for every
>> element that is output. This is functionally equivalent to calling
>> setWatermark after each output within your @ProcessElement method in the
>> SplittableDoFn. The MonotonicallyIncreasing implementation for
>> the TimestampObserving estimator ensures that the largest timestamp seen so
>> far will be reported for the watermark.
>>
>> The default is to not set any watermark estimate.
>>
>> For all watermark estimators you're allowed to set the watermark estimate
>> to anything as the runner will recompute the output watermark as:
>> new output watermark = max(previous output watermark, min(upstream
>> watermark, watermark estimates))
>> This effectively means that the watermark will never go backwards from
>> the runners point of view but that does mean that setting the watermark
>> estimate below the previous output watermark (which isn't observable) will
>> not do anything beyond holding the watermark at the previous output
>> watermark.
>>
>> Depending on the windowing strategy and allowed lateness, any records
>> that are output with a timestamp that is too early can be considered
>> droppably late, otherwise they will be late/ontime/early.
>>
>> So as an author for an SDF transform, you need to figure out:
>> 1) What timestamp your going to output your records at
>> * use upstream input elements timestamp: guidance use the default
>> implementation and to get the upstream watermark by default
>> * use data from within the record being output or external system state
>> via an API call: use a watermark estimator
>> 2) How you want to compute the watermark estimate (if at all)
>> * the choice here depends on how the elements timestamps progress, are
>> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>>
>> For both of these it is upto you to choose how much flexibility in these
>> decisions you want to give to your users and that should guide what you
>> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
>> many other sources don't expose anything.
>>
>>
>> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
>> [email protected]> wrote:
>>
>>> Hi Luke,
>>>
>>> I am also looking at the `WatermarkEstimators.manual` option, in
>>> parallel. Now we are getting data past our Fixed Window but the aggregation
>>> is not as expected.  The doc says setWatermark will "set timestamp
>>> before or at the timestamps of all future elements produced by the
>>> associated DoFn". If I output with a timestamp as below then could you
>>> please clarify on how we should set the watermark for this manual
>>> watermark estimator?
>>>
>>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>>
>>> Thanks,
>>> Praveen
>>>
>>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>>> close allowing for the Count transform to produce output?
>>>>
>>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>>
>>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi everyone!
>>>>>
>>>>> We are developing a new IO connector using the SDF API, and testing it
>>>>> with the following simple counting pipeline:
>>>>>
>>>>>
>>>>>
>>>>> p.apply(MyIO.read()
>>>>>
>>>>>         .withStream(inputStream)
>>>>>
>>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>>
>>>>>         .withConsumerConfig(config)
>>>>>
>>>>>     ) // gets a PCollection<KV<String, String>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>>
>>>>>
>>>>>
>>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>
>>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>>
>>>>>     .accumulatingFiredPanes())
>>>>>
>>>>>
>>>>>
>>>>> .apply(Count.<String>perElement())
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> // write PCollection<KV<String, Long>> to stream
>>>>>
>>>>> .apply(MyIO.write()
>>>>>
>>>>>         .withStream(outputStream)
>>>>>
>>>>>         .withConsumerConfig(config));
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Without the window transform, we can read from the stream and write to
>>>>> it, however, I don’t see output after the Window transform. Could you
>>>>> please help pin down the issue?
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Gaurav
>>>>>
>>>>
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>

-- 
Thanks,
Praveen K Viswanathan

Reply via email to