Pretty sure, yes. The Iterable in a MapPartitionFunction should give you all the values in a given partition.
I checked again for streaming execution. We're doing the opposite, right now: every element is a bundle in itself, startBundle()/finishBundle() are called for every element which seems a bit wasteful. The only other option is to see all elements as one bundle, because Flink does not bundle/micro batch elements in streaming execution. On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid> wrote: > Are you sure about that for Flink? I thought the iterable finished when > you processed a maximum number of elements or the input queue was empty so > that it could returned control back to akka for better sharing of the > thread pool. > > > https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99 > Also in the javadocs for DoFn.Context it explicitly states that you can > emit from the finishBundle method. > > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110 > I thought I had seen some example of this being used for batching output > to something downstream, like HDFS or Kafka, but I'm not sure on that. If > you can emit from finsihBundle and an new instance of the DoFn will be > created around each bundle then I can see some people trying to do > aggregations inside a DoFn and then emitting them at the end of the bundle > knowing that if a batch fails or is rolled back the system will handle it. > If that is not allowed we should really update the javadocs around it to > explain the pitfalls of doing this. > - Bobby > > On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek < > aljos...@apache.org> wrote: > > > Hi, > a quick related question: In the Flink runner we basically see everything > as one big bundle, i.e. we call startBundle() once at the beginning and > then keep processing indefinitely, never calling finishBundle(). Is this > also correct behavior? > > Best, > Aljoscha > > On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote: > > > Hey everyone; > > > > I'm starting to work on BEAM-38 ( > > https://issues.apache.org/jira/browse/BEAM-38), which enables an > > optimization for runners with many small bundles. BEAM-38 allows runners > to > > reuse DoFn instances so long as that DoFn has not terminated abnormally. > > This replaces the previous requirement that a DoFn be used for only a > > single bundle if either of startBundle or finishBundle have been > > overwritten. > > > > DoFn deserialization-per-bundle can be a significance performance > > bottleneck when there are many small bundles, as is common in streaming > > executions. It has also surfaced as the cause of much of the current > > slowness in the new InProcessRunner. > > > > Existing Runners do not require any changes; they may choose to take > > advantage of of the new optimization opportunity. However, user DoFns may > > need to be revised to properly set up and tear down state in startBundle > > and finishBundle, respectively, if the depended on only being used for a > > single bundle. > > > > The first two updates are already in pull requests: > > > > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the > > Javadoc to the new spec > > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the > > DirectRunner to reuse DoFns according to the new policy. > > > > Yours, > > > > Thomas > > > > >