Hi, Aljoscha

I returned with question about wrapper =)

kafka1 -> transformer1 -> kafka2

Load data from kafka1, split in 10+ events and push result in kafka2

processing use PushbackSideInputDoFnRunner and chaining

but Pushback use streaming wrapper DoFnOperator which invoke finishBundle
on every element and as result I have:

1) load from kafka
2) parsing and invoke context.collect(element1)
3) chaining to kafka
4) finishBundle kafka => kafka.flush()
5) context.collect(element2)
6) chaining to kafka
7) finishBundle kafka => kafka.flush()
...
etc

Do you have idea how I can prevent flush() on every element, because now
it's bottleneck for me?

Thanks,
Alexey Diomin


2016-11-19 11:59 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>:

> @amir, what do you mean? Naming a ParDo "startBundle" is not the same thing
> as having a @StartBundle or startBundle() (for OldDoFn) method in your
> ParDo.
>
> On Sat, 19 Nov 2016 at 00:22 amir bahmanyari <amirto...@yahoo.com.invalid>
> wrote:
>
> > Interesting. I have been including "startBundle" in KafkaIO() thus
> > far.What could happen as far as Flink cluster performance in the
> > following?Thanks Aljoscha.
> > PCollection<KV<String, String>> kafkarecords = p
> > .apply(KafkaIO.read().withBootstrapServers("kafka01:
> 9092").withTopics(topics)
> > .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> > .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String,
> > String>>() {
> > Amir-
> >
> >       From: Aljoscha Krettek <aljos...@apache.org>
> >  To: amir bahmanyari <amirto...@yahoo.com>; Eugene Kirpichov <
> > kirpic...@google.com>; "dev@beam.incubator.apache.org" <
> > dev@beam.incubator.apache.org>
> >  Sent: Friday, November 18, 2016 2:54 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Regarding the Flink runner and how it calls startBundle()/finishBundle():
> > it's currently done like this because it is correct and because there is
> no
> > other "natural" point where it could be called. Flink continuously
> > processes elements and at some (user defined) interval performs
> checkpoints
> > to persist state. We could call startBundle()/finishBundle() when this
> > happens but I chose not to (for the time being) because this could lead
> to
> > problems if the user sets a rather large interval. Users can even disable
> > checkpointing, in which case we would never call these methods.
> >
> > --
> > Aljoscha
> >
> > On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <amirto...@yahoo.com.invalid
> >
> > wrote:
> >
> > > Oops! sorry :-) Thanks Eugene ...
> > >
> > >      From: Eugene Kirpichov <kirpic...@google.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirto...@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 1:09 PM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Amir - @Setup is a regular Java annotation; class names in Java
> > (including
> > > names of annotation classes), like all other names, are case-sensitive.
> > >
> > > On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> > > <amirto...@yahoo.com.invalid> wrote:
> > >
> > > Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does
> it
> > > matter if lowercase @setup or @Setup?I hope not. :-))Will update you
> when
> > > its done and share my observations.Cheers+have a great weekend.Amir-
> > >
> > >      From: Alexey Demin <diomi...@gmail.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirto...@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 12:38 PM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > In my case it's:
> > > 1) i don't rebuild index by filters every time, only one time on start
> > > processing
> > > 2) connection for remote db does not open hundreds times in second
> > >
> > > as result all pipeline work more stable and faster
> > >
> > > 2016-11-19 0:06 GMT+04:00 amir bahmanyari <amirto...@yahoo.com.invalid
> >:
> > >
> > > > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > > > with @Setup?I am going to give it a try & see what diff it
> > > > makes.Interesting & thanks for bringing it up...
> > > > Cheers
> > > >
> > > >      From: Demin Alexey <diomi...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org
> > > >  Sent: Friday, November 18, 2016 11:12 AM
> > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > >
> > > > Oh, this is my mistake
> > > >
> > > > Yes correct way its use @Setup.
> > > >
> > > > Thank you Eugene.
> > > >
> > > >
> > > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > > :
> > > >
> > > > > Hi Alexey,
> > > > >
> > > > > In general, things like establishing connections and initializing
> > > caches
> > > > > are better done in @Setup and @TearDown methods, rather than
> > > @StartBundle
> > > > > and @FinishBundle, because DoFn's can be reused between bundles and
> > > this
> > > > > way you get more benefit from reuse.
> > > > >
> > > > > Bundles can be pretty small, especially in streaming pipelines.
> That
> > > > said,
> > > > > they normally shouldn't be 1-element-small. Hopefully someone
> working
> > > on
> > > > > the Flink runner can comment.
> > > > >
> > > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > > > <amirto...@yahoo.com.invalid> wrote:
> > > > >
> > > > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> > > tons
> > > > of
> > > > > > threads get on WAIT state after sometime& stay like that
> relatively
> > > > > > forever.I have a 100 G worth of elements to process...........Is
> > > there
> > > > a
> > > > > > way to bypass this "startBundle" & get a fairly optimized
> > > > > > behavior?Anyone? Thanks+regardsAmir-
> > > > > >
> > > > > >      From: Demin Alexey <diomi...@gmail.com>
> > > > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > > > amirto...@yahoo.com
> > > > > >
> > > > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > > > >
> > > > > > Very simple example:
> > > > > >
> > > > > > My DoFn on startBundle load filters from remote db and build
> > > optimized
> > > > > > index, on processElement apply filters on every element for
> > decision
> > > > > about
> > > > > > push element to next operation or drop his.
> > > > > >
> > > > > > In current implementation it's like matching regexp on string,
> you
> > > > have 2
> > > > > > way
> > > > > > 1) compile regexp every time for every element
> > > > > > 2) compile regexp one time and apply on all element
> > > > > >
> > > > > > now flink work by 1 way and this way not optimal
> > > > > >
> > > > > >
> > > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> > > <amirto...@yahoo.com.invalid
> > > > > >:
> > > > > >
> > > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate
> > on
> > > > > > > "expensive" as per each element pls?
> > > > > > > Thanks
> > > > > > >
> > > > > > >      From: Demin Alexey <diomi...@gmail.com>
> > > > > > >  To: dev@beam.incubator.apache.org
> > > > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > > > >
> > > > > > > Hi
> > > > > > >
> > > > > > > In flink runner we have this code:
> > > > > > >
> > > > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > > > >
> > > > > > > but in mostly cases method startBundle can be expensive for
> > making
> > > > for
> > > > > > > every element (for example connection for db/build cache/ etc)
> > > > > > >
> > > > > > > Why so important invoke startBundle/finishBundle on every
> > > > > > > incoming streamRecord ?
> > > > > > >
> > > > > > > Thanks
> > > > > > > Alexey Diomin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
>

Reply via email to