Pretty sure, yes. The Iterable in a MapPartitionFunction should give you
all the values in a given partition.

I checked again for streaming execution. We're doing the opposite, right
now: every element is a bundle in itself, startBundle()/finishBundle() are
called for every element which seems a bit wasteful. The only other option
is to see all elements as one bundle, because Flink does not bundle/micro
batch elements in streaming execution.

On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid> wrote:

> Are you sure about that for Flink?  I thought the iterable finished when
> you processed a maximum number of elements or the input queue was empty so
> that it could returned control back to akka for better sharing of the
> thread pool.
>
>
> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
> Also in the javadocs for DoFn.Context it explicitly states that you can
> emit from the finishBundle method.
>
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
> I thought I had seen some example of this being used for batching output
> to something downstream, like HDFS or Kafka, but I'm not sure on that.  If
> you can emit from finsihBundle and an new instance of the DoFn will be
> created around each bundle then I can see some people trying to do
> aggregations inside a DoFn and then emitting them at the end of the bundle
> knowing that if a batch fails or is rolled back the system will handle it.
> If that is not allowed we should really update the javadocs around it to
> explain the pitfalls of doing this.
>  - Bobby
>
>     On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
>
>
>  Hi,
> a quick related question: In the Flink runner we basically see everything
> as one big bundle, i.e. we call startBundle() once at the beginning and
> then keep processing indefinitely, never calling finishBundle(). Is this
> also correct behavior?
>
> Best,
> Aljoscha
>
> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > Hey everyone;
> >
> > I'm starting to work on BEAM-38 (
> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
> > optimization for runners with many small bundles. BEAM-38 allows runners
> to
> > reuse DoFn instances so long as that DoFn has not terminated abnormally.
> > This replaces the previous requirement that a DoFn be used for only a
> > single bundle if either of startBundle or finishBundle have been
> > overwritten.
> >
> > DoFn deserialization-per-bundle can be a significance performance
> > bottleneck when there are many small bundles, as is common in streaming
> > executions. It has also surfaced as the cause of much of the current
> > slowness in the new InProcessRunner.
> >
> > Existing Runners do not require any changes; they may choose to take
> > advantage of of the new optimization opportunity. However, user DoFns may
> > need to be revised to properly set up and tear down state in startBundle
> > and finishBundle, respectively, if the depended on only being used for a
> > single bundle.
> >
> > The first two updates are already in pull requests:
> >
> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
> > Javadoc to the new spec
> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
> > DirectRunner to reuse DoFns according to the new policy.
> >
> > Yours,
> >
> > Thomas
> >
>
>
>

Reply via email to