Hi Jan,

I have been working on a timeseries extension which makes use of many of
these techniques for joining two temporal streams, it's almost ready for
the PR, will ping it here when it is as it might be useful for you. In
general, I borrowed a lot of techniques from CoGroupBy code.

*1) need to figure out how to get Coder of input PCollection of stateful
ParDo inside StatefulDoFnRunner*
My join takes in a <K, V1, V2> , in the outer transform I use things like
leftCollection.getCoder()).getValueCoder(); Then when creating the Join
transform I can defer the StateSpec object creation until the constructor
is called.

*2) there are performance considerations, that can be solved probably only
by Sorted Map State [2]*
Sorted Map is going to be awesome, until then the only option is to create
a Cache in the DoFn to make it more efficient. For the cache to work you
need to key on Window + key and do things like clear the
cache @Startbundle. Better to wait for Sorted Map if this is not time
critical.

*3) additional work is needed for allowedLateness to work correctly (and
there are at least two ways how to solve this), see the design doc [3]*
Yup, in my case I can support this by not GC the right side of the join for
now, but that is a compromise.

*4) more tests (for batch and validatesRunner) are needed*
I just posted a question on the best way to make use of the @ValidateRunner
annotation on this list, sounds like it might be useful to you as well :-)


On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I have written a PoC implementation of this in [1] and I'd like to
> discuss some implementation details. First of all, I'd appreciate any
> feedback about this. There are some known issues:
>
>   1) need to figure out how to get Coder of input PCollection of
> stateful ParDo inside StatefulDoFnRunner
>
>   2) there are performance considerations, that can be solved probably
> only by Sorted Map State [2]
>
>   3) additional work is needed for allowedLateness to work correctly
> (and there are at least two ways how to solve this), see the design doc [3]
>
>   4) more tests (for batch and validatesRunner) are needed
>
> I have come across a few bugs in DirectRunner, which I tried to solve:
>
>   a) timers seem to be broken in stateful pardo with side inputs
>
>   b) timers need to be sorted by timestamp, otherwise state might be
> cleared before it gets chance to be flushed
>
>
> Thanks for feedback,
>
>   Jan
>
>
> [1] https://github.com/apache/beam/pull/8774
>
> [2]
>
> http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3ccalstk6+ldemtjmnuysn3vcufywjkhmgv1isfbdmxthoqh91...@mail.gmail.com%3e
>
> [3]
>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
>
> On 5/23/19 4:40 PM, Robert Bradshaw wrote:
> > Thanks for writing this up.
> >
> > I think the justification for adding this to the model needs to be
> > that it is useful (you have this covered, though some examples would
> > be nice) and that it's something that can't easily be done by users
> > themselves (specifically, though it can be (relatively) cheaply done
> > in streaming and batch, it's done in very different ways, and also
> > that it's hard to do via composition).
> >
> > On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je...@seznam.cz> wrote:
> >> Hi,
> >>
> >> I have written a very brief draft of how it might be possible to
> >> implement @RequireTimeSortedInput discussed in [1]. I see the document
> >> [2] a starting point for a discussion. There are several open questions,
> >> which I believe can be resolved by this great community. :-)
> >>
> >> Jan
> >>
> >> [1]
> http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
> >>
> >> [2]
> >>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >>
>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Reply via email to