@amir, what do you mean? Naming a ParDo "startBundle" is not the same thing
as having a @StartBundle or startBundle() (for OldDoFn) method in your
ParDo.

On Sat, 19 Nov 2016 at 00:22 amir bahmanyari <amirto...@yahoo.com.invalid>
wrote:

> Interesting. I have been including "startBundle" in KafkaIO() thus
> far.What could happen as far as Flink cluster performance in the
> following?Thanks Aljoscha.
> PCollection<KV<String, String>> kafkarecords = p
> .apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics)
> .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String,
> String>>() {
> Amir-
>
>       From: Aljoscha Krettek <aljos...@apache.org>
>  To: amir bahmanyari <amirto...@yahoo.com>; Eugene Kirpichov <
> kirpic...@google.com>; "dev@beam.incubator.apache.org" <
> dev@beam.incubator.apache.org>
>  Sent: Friday, November 18, 2016 2:54 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Regarding the Flink runner and how it calls startBundle()/finishBundle():
> it's currently done like this because it is correct and because there is no
> other "natural" point where it could be called. Flink continuously
> processes elements and at some (user defined) interval performs checkpoints
> to persist state. We could call startBundle()/finishBundle() when this
> happens but I chose not to (for the time being) because this could lead to
> problems if the user sets a rather large interval. Users can even disable
> checkpointing, in which case we would never call these methods.
>
> --
> Aljoscha
>
> On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <amirto...@yahoo.com.invalid>
> wrote:
>
> > Oops! sorry :-) Thanks Eugene ...
> >
> >      From: Eugene Kirpichov <kirpic...@google.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirto...@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 1:09 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Amir - @Setup is a regular Java annotation; class names in Java
> (including
> > names of annotation classes), like all other names, are case-sensitive.
> >
> > On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> > <amirto...@yahoo.com.invalid> wrote:
> >
> > Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it
> > matter if lowercase @setup or @Setup?I hope not. :-))Will update you when
> > its done and share my observations.Cheers+have a great weekend.Amir-
> >
> >      From: Alexey Demin <diomi...@gmail.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirto...@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 12:38 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > In my case it's:
> > 1) i don't rebuild index by filters every time, only one time on start
> > processing
> > 2) connection for remote db does not open hundreds times in second
> >
> > as result all pipeline work more stable and faster
> >
> > 2016-11-19 0:06 GMT+04:00 amir bahmanyari <amirto...@yahoo.com.invalid>:
> >
> > > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > > with @Setup?I am going to give it a try & see what diff it
> > > makes.Interesting & thanks for bringing it up...
> > > Cheers
> > >
> > >      From: Demin Alexey <diomi...@gmail.com>
> > >  To: dev@beam.incubator.apache.org
> > >  Sent: Friday, November 18, 2016 11:12 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Oh, this is my mistake
> > >
> > > Yes correct way its use @Setup.
> > >
> > > Thank you Eugene.
> > >
> > >
> > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov
> <kirpic...@google.com.invalid
> > >
> > > :
> > >
> > > > Hi Alexey,
> > > >
> > > > In general, things like establishing connections and initializing
> > caches
> > > > are better done in @Setup and @TearDown methods, rather than
> > @StartBundle
> > > > and @FinishBundle, because DoFn's can be reused between bundles and
> > this
> > > > way you get more benefit from reuse.
> > > >
> > > > Bundles can be pretty small, especially in streaming pipelines. That
> > > said,
> > > > they normally shouldn't be 1-element-small. Hopefully someone working
> > on
> > > > the Flink runner can comment.
> > > >
> > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > > <amirto...@yahoo.com.invalid> wrote:
> > > >
> > > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> > tons
> > > of
> > > > > threads get on WAIT state after sometime& stay like that relatively
> > > > > forever.I have a 100 G worth of elements to process...........Is
> > there
> > > a
> > > > > way to bypass this "startBundle" & get a fairly optimized
> > > > > behavior?Anyone? Thanks+regardsAmir-
> > > > >
> > > > >      From: Demin Alexey <diomi...@gmail.com>
> > > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > > amirto...@yahoo.com
> > > > >
> > > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > > >
> > > > > Very simple example:
> > > > >
> > > > > My DoFn on startBundle load filters from remote db and build
> > optimized
> > > > > index, on processElement apply filters on every element for
> decision
> > > > about
> > > > > push element to next operation or drop his.
> > > > >
> > > > > In current implementation it's like matching regexp on string, you
> > > have 2
> > > > > way
> > > > > 1) compile regexp every time for every element
> > > > > 2) compile regexp one time and apply on all element
> > > > >
> > > > > now flink work by 1 way and this way not optimal
> > > > >
> > > > >
> > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> > <amirto...@yahoo.com.invalid
> > > > >:
> > > > >
> > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate
> on
> > > > > > "expensive" as per each element pls?
> > > > > > Thanks
> > > > > >
> > > > > >      From: Demin Alexey <diomi...@gmail.com>
> > > > > >  To: dev@beam.incubator.apache.org
> > > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > > >
> > > > > > Hi
> > > > > >
> > > > > > In flink runner we have this code:
> > > > > >
> > > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > > >
> > > > > > but in mostly cases method startBundle can be expensive for
> making
> > > for
> > > > > > every element (for example connection for db/build cache/ etc)
> > > > > >
> > > > > > Why so important invoke startBundle/finishBundle on every
> > > > > > incoming streamRecord ?
> > > > > >
> > > > > > Thanks
> > > > > > Alexey Diomin
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
> >
>
>
>

Reply via email to