Hi, Aljoscha I returned with question about wrapper =)
kafka1 -> transformer1 -> kafka2 Load data from kafka1, split in 10+ events and push result in kafka2 processing use PushbackSideInputDoFnRunner and chaining but Pushback use streaming wrapper DoFnOperator which invoke finishBundle on every element and as result I have: 1) load from kafka 2) parsing and invoke context.collect(element1) 3) chaining to kafka 4) finishBundle kafka => kafka.flush() 5) context.collect(element2) 6) chaining to kafka 7) finishBundle kafka => kafka.flush() ... etc Do you have idea how I can prevent flush() on every element, because now it's bottleneck for me? Thanks, Alexey Diomin 2016-11-19 11:59 GMT+04:00 Aljoscha Krettek <aljos...@apache.org>: > @amir, what do you mean? Naming a ParDo "startBundle" is not the same thing > as having a @StartBundle or startBundle() (for OldDoFn) method in your > ParDo. > > On Sat, 19 Nov 2016 at 00:22 amir bahmanyari <amirto...@yahoo.com.invalid> > wrote: > > > Interesting. I have been including "startBundle" in KafkaIO() thus > > far.What could happen as far as Flink cluster performance in the > > following?Thanks Aljoscha. > > PCollection<KV<String, String>> kafkarecords = p > > .apply(KafkaIO.read().withBootstrapServers("kafka01: > 9092").withTopics(topics) > > .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) > > .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String, > > String>>() { > > Amir- > > > > From: Aljoscha Krettek <aljos...@apache.org> > > To: amir bahmanyari <amirto...@yahoo.com>; Eugene Kirpichov < > > kirpic...@google.com>; "dev@beam.incubator.apache.org" < > > dev@beam.incubator.apache.org> > > Sent: Friday, November 18, 2016 2:54 PM > > Subject: Re: Flink runner. Wrapper for DoFn > > > > Regarding the Flink runner and how it calls startBundle()/finishBundle(): > > it's currently done like this because it is correct and because there is > no > > other "natural" point where it could be called. Flink continuously > > processes elements and at some (user defined) interval performs > checkpoints > > to persist state. We could call startBundle()/finishBundle() when this > > happens but I chose not to (for the time being) because this could lead > to > > problems if the user sets a rather large interval. Users can even disable > > checkpointing, in which case we would never call these methods. > > > > -- > > Aljoscha > > > > On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <amirto...@yahoo.com.invalid > > > > wrote: > > > > > Oops! sorry :-) Thanks Eugene ... > > > > > > From: Eugene Kirpichov <kirpic...@google.com> > > > To: dev@beam.incubator.apache.org; amir bahmanyari < > amirto...@yahoo.com > > > > > > Sent: Friday, November 18, 2016 1:09 PM > > > Subject: Re: Flink runner. Wrapper for DoFn > > > > > > Amir - @Setup is a regular Java annotation; class names in Java > > (including > > > names of annotation classes), like all other names, are case-sensitive. > > > > > > On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari > > > <amirto...@yahoo.com.invalid> wrote: > > > > > > Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does > it > > > matter if lowercase @setup or @Setup?I hope not. :-))Will update you > when > > > its done and share my observations.Cheers+have a great weekend.Amir- > > > > > > From: Alexey Demin <diomi...@gmail.com> > > > To: dev@beam.incubator.apache.org; amir bahmanyari < > amirto...@yahoo.com > > > > > > Sent: Friday, November 18, 2016 12:38 PM > > > Subject: Re: Flink runner. Wrapper for DoFn > > > > > > In my case it's: > > > 1) i don't rebuild index by filters every time, only one time on start > > > processing > > > 2) connection for remote db does not open hundreds times in second > > > > > > as result all pipeline work more stable and faster > > > > > > 2016-11-19 0:06 GMT+04:00 amir bahmanyari <amirto...@yahoo.com.invalid > >: > > > > > > > Hi Alexey,What improvements do you expect by replacing @StartBundle > > > > with @Setup?I am going to give it a try & see what diff it > > > > makes.Interesting & thanks for bringing it up... > > > > Cheers > > > > > > > > From: Demin Alexey <diomi...@gmail.com> > > > > To: dev@beam.incubator.apache.org > > > > Sent: Friday, November 18, 2016 11:12 AM > > > > Subject: Re: Flink runner. Wrapper for DoFn > > > > > > > > Oh, this is my mistake > > > > > > > > Yes correct way its use @Setup. > > > > > > > > Thank you Eugene. > > > > > > > > > > > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov > > <kirpic...@google.com.invalid > > > > > > > > : > > > > > > > > > Hi Alexey, > > > > > > > > > > In general, things like establishing connections and initializing > > > caches > > > > > are better done in @Setup and @TearDown methods, rather than > > > @StartBundle > > > > > and @FinishBundle, because DoFn's can be reused between bundles and > > > this > > > > > way you get more benefit from reuse. > > > > > > > > > > Bundles can be pretty small, especially in streaming pipelines. > That > > > > said, > > > > > they normally shouldn't be 1-element-small. Hopefully someone > working > > > on > > > > > the Flink runner can comment. > > > > > > > > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari > > > > > <amirto...@yahoo.com.invalid> wrote: > > > > > > > > > > > Hmmm...Thanks...This could very well be my bottleneck since I see > > > tons > > > > of > > > > > > threads get on WAIT state after sometime& stay like that > relatively > > > > > > forever.I have a 100 G worth of elements to process...........Is > > > there > > > > a > > > > > > way to bypass this "startBundle" & get a fairly optimized > > > > > > behavior?Anyone? Thanks+regardsAmir- > > > > > > > > > > > > From: Demin Alexey <diomi...@gmail.com> > > > > > > To: dev@beam.incubator.apache.org; amir bahmanyari < > > > > amirto...@yahoo.com > > > > > > > > > > > > Sent: Friday, November 18, 2016 10:40 AM > > > > > > Subject: Re: Flink runner. Wrapper for DoFn > > > > > > > > > > > > Very simple example: > > > > > > > > > > > > My DoFn on startBundle load filters from remote db and build > > > optimized > > > > > > index, on processElement apply filters on every element for > > decision > > > > > about > > > > > > push element to next operation or drop his. > > > > > > > > > > > > In current implementation it's like matching regexp on string, > you > > > > have 2 > > > > > > way > > > > > > 1) compile regexp every time for every element > > > > > > 2) compile regexp one time and apply on all element > > > > > > > > > > > > now flink work by 1 way and this way not optimal > > > > > > > > > > > > > > > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari > > > <amirto...@yahoo.com.invalid > > > > > >: > > > > > > > > > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate > > on > > > > > > > "expensive" as per each element pls? > > > > > > > Thanks > > > > > > > > > > > > > > From: Demin Alexey <diomi...@gmail.com> > > > > > > > To: dev@beam.incubator.apache.org > > > > > > > Sent: Friday, November 18, 2016 7:40 AM > > > > > > > Subject: Flink runner. Wrapper for DoFn > > > > > > > > > > > > > > Hi > > > > > > > > > > > > > > In flink runner we have this code: > > > > > > > > > > > > > > https://github.com/apache/incubator-beam/blob/master/ > > > > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/ > > > > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262 > > > > > > > > > > > > > > but in mostly cases method startBundle can be expensive for > > making > > > > for > > > > > > > every element (for example connection for db/build cache/ etc) > > > > > > > > > > > > > > Why so important invoke startBundle/finishBundle on every > > > > > > > incoming streamRecord ? > > > > > > > > > > > > > > Thanks > > > > > > > Alexey Diomin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >