I can submit a PR that hooks it up incorrectly(?) similar to how WindowReiterable works [1]. That will at least fix the performance issue, with the expense of making the estimated size wrong in the Dataflow UI. The correct implementation seems like it'd be more complicated, since you'd need to plumb the element coder through into the iterable and encode each element to get its size, but only if an observer is present to observe it [2]. The only place I can see that this is correctly used is in the shuffle reader, when it already has a byte[], and thus getting the size is cheap. [3]
[1] https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java#L156 [2] https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservableIterator.java#L34 [3] https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java#L452 On Tue, Mar 22, 2022 at 4:18 PM Robert Bradshaw <[email protected]> wrote: > If we're not using ElementByteSizeObservable for our CoGBK outputs, > that's the right fix to make. > > On Tue, Mar 22, 2022 at 12:20 PM Steve Niemitz <[email protected]> > wrote: > > > > Actually, I'm confused, in that example I linked, isn't it missing the > part that hooks up the element observable to the returned iterator? > ElementByteSizeObservableIterable does it in the implementation of > iterator() [1], but WindowReiterable overrides iterator. > > > > > > [1] > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservableIterable.java#L54 > > > > On Tue, Mar 22, 2022 at 3:07 PM Steve Niemitz <[email protected]> > wrote: > >> > >> Oh that's interesting I didn't know it even had that optimization. I > wonder if implementing `ElementByteSizeObservableIterable` in TagIterable > would be the solution then? You're right this does seem very brittle > though. > >> > >> It seems like GroupByKey does the "right" thing, WindowReiterable > extends that magic interface [1] > >> > >> [1] > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java#L156 > >> > >> On Tue, Mar 22, 2022 at 2:41 PM Luke Cwik <[email protected]> wrote: > >>> > >>> The same issue exists for the iterable that is output by a GroupByKey. > >>> > >>> We avoid loading all the data by using a byte size observer[1] that is > registered[1] which supports lazy iteration. The interaction pattern is > brittle in that an incorrect implementation will cause the entire iterable > to be walked when estimating and we will lose the lazy iteration benefit. > >>> > >>> 1: > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java#L29 > >>> 2: > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java#L192 > >>> > >>> > >>> > >>> On Wed, Mar 16, 2022 at 4:09 PM Steve Niemitz <[email protected]> > wrote: > >>>> > >>>> The thread a couple days ago about CoGroupByKey being possibly broken > in beam 2.36.0 [1] had an interesting thing in it that had me thinking. > The CoGbkResultCoder doesn't override getEncodedElementByteSize (nor does > it know how to actually compute the size in any efficient way), so sampling > a CoGbkResult element size will require iterating over the entire result. > This can be incredibly expensive, given that CoGbkResult objects can have > an arbitrarily large number of elements (even more than that could fit into > memory) and will require paging them in from shuffle. These need to be all > encoded, even if they never would have been to begin with (given that > consumer of the CoGbkResult will fuse to the operation itself). > >>>> > >>>> Making CoGbkResultCoder.getEncodedElementByteSize simply return 0 > provided a significant performance boost in a few of the jobs I tested it > with, and in fact fixed some jobs that had been failing due to OOM issues. > The downside is obviously that this estimate will be incorrect now in the > dataflow UI, but given that things like GBK are also incorrect, do we > particularly care? > >>>> > >>>> Curious what the community thinks here, imo the benefit far outweighs > the negative. > >>>> > >>>> > >>>> [1] https://lists.apache.org/thread/5y56kbgm3q0m1byzf7186rrkomrcfldm >
