On Fri, 21 Jun 2019 at 18:02, Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reza,
>
> great prezentation on the Beam Summit. I have had a few posts here in the
> list during last few weeks, some of which might actually be related to both
> looping timers and validity windows. But maybe you will be able to see a
> different approach, than I do, so questions:
>
>  a) because of [1] timers might not be exactly ordered (the JIRA talks
> about DirectRunner, but I suppose the issue is present on all runners that
> use immutable bundles of size > 1, so might be related to Dataflow as
> well). This might cause issues when you try to introduce TTL for looping
> timers, because the TTL timer might get fired before regular looping timer,
> which might cause incorrect results (state cleared before have been
> flushed).
>
The TTL check would be in the same Timer rather than a separate Timer.  The
max value processed in each OnTimer call would be stored in valuestate and
used as base to know how long it has been seen the pipeline has seen an
external value for that key.

>  b) because stateful pardo doesn't sort by timestamp, that implies, that
> you have to store last values in BagState (as opposed to the blog, where
> you just emit identity value of sum operation), right?
>
You can store it in ValueState rather than BagState, but yes you store that
value in State ready for the next OnTimer() fire.

>  c) because of how stateful pardo currently works on batch, does that
> imply that all values (per key) would have to be stored in memory? would
> that scale?
>
This is one of the sharp edges and the answer is ... it depends :-) I would
recommend always using a  FixedWindow+Combiner before this step, this will
compress the values into something much smaller. For example in case of
building 'candles' this will compress down to low/hi/first/last values per
FixedWindow length. If the window length is very small there maybe no
compression, but in most cases I have seen this is a ok compromise.

> There is a discussion about problem a) in [2], but maybe there is some
> different approach possible. For problem b) and c) there is a proposal [3].
> When the input is sorted, it starts to work both in batch and with
> ValueState, because the last value is the *valid* value.
>
There was also a discussion on dev@ around a sorted Map state, which would
be very cool for this usecase.

> This has even connection with the mentioned validity windows, as if you
> sort by timestamp, the _last_ value is the _valid_ value, so is essentially
> boils down to keep single value per key (and again, starts to work in both
> batch and stream).
>
one for Tyler :-)

> I even have a suspicion, that sorting by timestamp has close relation to
> retractions, because when you are using sorted streams, retractions
> actually became only diff between last emitted pane, and current pane. That
> might even help implement that in general, but I might be missing
> something. This just popped in my head today, as I was thinking why there
> was actually no (or little) need for retractions in Euphoria model (very
> similar to Beam, actually differs by the sorting thing :)), and why it the
> need pops out so often in Beam.
>
Retractions will be possible with this, but it does mean that we would need
to keep old versions around, something built in would be very cool rather
than building it with this pattern.

> I'd be very happy to hear what you think about all of this.
>
> Cheers,
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>
> [2]
> https://lists.apache.org/thread.html/1a3a0dd9da682e159f78f131d335782fd92b047895001455ff659613@%3Cdev.beam.apache.org%3E
>
> [3]
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
> On 6/21/19 8:12 AM, Reza Rokni wrote:
>
> Great question, one thing that we did not cover in the blog and I think we
> should have is the use case where you would want to bootstrap the
> pipeline.
>
> One option would be on startup to have an extra bounded source that is
> read and flattened into the main pipeline, the source will need to contain
> values in  Timestamped<V> format which would correspond to the first window
> that you would like to kickstart the process from.  Will see if I can try
> and find some time to code up an example and add that and the looping timer
> code into the Beam patterns.
>
> https://beam.apache.org/documentation/patterns/overview/
>
> Cheers
> Reza
>
>
>
>
>
> On Fri, 21 Jun 2019 at 07:59, Manu Zhang <owenzhang1...@gmail.com> wrote:
>
>> Indeed interesting pattern.
>>
>> One minor question. It seems the timer is triggered by the first element
>> so what if there is no data in the "first interval" ?
>>
>> Thanks for the write-up.
>> Manu
>>
>> On Wed, Jun 19, 2019 at 12:15 PM Reza Rokni <r...@google.com> wrote:
>>
>>> Hi folks,
>>>
>>> Just wanted to drop a note here on a new pattern that folks may find
>>> interesting, called  Looping Timers. It allows for default values to be
>>> created in interval windows in the absence of any external data coming into
>>> the pipeline. The details are in this blog below:
>>>
>>> https://beam.apache.org/blog/2019/06/11/looping-timers.html
>>>
>>> Its main utility is when dealing with time series data. There are still
>>> rough edges, like dealing with TTL and it would be great to hear
>>> feedback on ways it can be improved.
>>>
>>> The next pattern to publish in this domain will assist will hold and
>>> propagation of values from one interval window to the next, which coupled
>>> to looping timers starts to solve some interesting problems.
>>>
>>> Cheers
>>>
>>> Reza
>>>
>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Reply via email to