On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < kirpic...@google.com.invalid> wrote:
> Hi, > > I agree with these concerns to an extent, however I think the advantage of > transparently letting any user code access side inputs, especially > including lambdas, is so great that we should find a way to address these > concerns within the constraints of the pattern I'm proposing. See more > below. > > On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers <bchamb...@google.com.invalid > > > wrote: > > > One possible issue with this is that updating a thread local is likely to > > be much more expensive than passing an additional argument. > > This is an implementation detail that can be fixed - Luke made a suggestion > on the PR to set up the side input context once per bundle rather than once > per element. > However remember that bundles might be small. Dataflow streaming runner creates small bundles by design. The Flink runner creates single-element bundles. > > > > Also, not all > > code called from within the DoFn will necessarily be in the same thread > > (eg., sometimes we create a pool of threads for doing work). > > I think we already require that c.output() can not be done from multiple > threads; and I don't think we document c.sideInput() to be thread-safe - it > may be reasonable to declare that it isn't and has to be accessed from the > same thread as the ProcessElement call. If we want to relax this, then > there might be ways to deal with that too, e.g. provide utilities for the > user to capture the "user code context" and restoring it inside a thread. > This would likely be valuable for other purposes, such as making those > extra threads visible to our profiling utilities. > This seems fair, but we should be be very careful about our documentation. And +1 to adding utilities to make multi-threaded work easier to manage. > > > > It may be > > *more* confusing for this to sometimes work magically and sometimes fail > > horribly. Also, requiring the PCollectionView to be passed to user code > > that accesses it is nice because it makes *very clear* that the side > input > > needs to be provided from the DoFn to that particular utility. If it is > > accessed via "spooky action at a distance" we lose that piece of "free" > > documentation, which may lead to extensive misuse of these utility > methods. > > > I'd like to understand this concern better - from this description it's not > clear to me. The pattern I'm proposing is that, when you're authoring a > PTransform that is configured by any user callbacks, then: > - you should provide a builder method .withSideInputs(...) > - you should propagate those side inputs to all your internal DoFn's that > invoke the user code > - in return the user callbacks will be allowed to access those particular > side inputs > This seems like a simple enough model to me to understand, both from a > user's perspective and from a transform author's perspective. Steps 1 and 2 > may eventually be automated by annotation analysis or other means (e.g. SDK > giving a way to provide given side inputs automatically to everything > inside a composite transform rather than to individual DoFn's). > > > > > > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov > > <kirpic...@google.com.invalid> wrote: > > > > > Hi, > > > > > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles <k...@google.com.invalid > > > > > wrote: > > > > > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > The differences are: > > > > > - The proposal in the doc allows wiring different side inputs to > the > > > same > > > > > Supplier, but I'm not convinced that this is important - you can > just > > > as > > > > > easily call the constructor of your DoFn passing different > > > > > PCollectionView's for it to capture. > > > > > > > > > > > > > I disagree with this bit about it being "just as easy". Passing the > > > needed > > > > PCollectionViews to your constructor (or even having a constructor) > is > > a > > > > pain. Every time I have to do it, it adds a ton of boilerplate that > > feels > > > > like pure noise. To make a DoFn reusable it must be made into a named > > > class > > > > with a constructor, versus inlined with no constructor. > > > > > > Hm, why? You can have the DoFn be an anonymous class capturing the > > > PCollectionView into a @SideInput field as a closure. > > > > > > > > > > A generous analogy > > > > is is that it is "just" manual closure conversion/currying, changing > > > > f(side, main) to f(side)(main). But in practice in Beam the second > one > > > has > > > > much more boilerplate. > > > > > > > > Also, Beam is worse. We present the user with higher-order functions, > > > which > > > > is where the actual annoyance comes in. When you want to pardo(f) you > > > have > > > > to write pardo(f(side))(side, main). Your proposal is to support > > > > pardo(f(side))(main) and mine is to support pardo(f)(side, main). I > > still > > > > propose that we support both (as they get implemented). If you buy in > > to > > > my > > > > analogy, then there's decades of precedent and the burden of proof > > falls > > > > heavily on whoever doesn't want to support both. > > > > > > > I see your point. I think the proposal is compatible with what you're > > > suggesting too - in DoFn we could have @SideInput *parameters* of type > > > PCollectionView, with the same semantics as a field. > > > > > > > > > > > > > > - My proposal allows getting rid of .withSideInputs() entirely, > because > > > the > > > > > DoFn captures the PCollectionView so you don't need to specify it > > > > > explicitly for wiring. > > > > > > > > > > > > > I've decided to change to full +1 (whatever that means compared to > 0.75 > > > :-) > > > > to adding support for @SideInput fields, because the benefits > outweigh > > > this > > > > failure mode: > > > > > > > > new DoFn { > > > > // forgot the annotation > > > > private final PCollectionView whatever; > > > > > > > > @ProcessElement public void process(...) { > > > > whatever.get(); // crash during execution > > > > } > > > > } > > > > > > > > But ideas to mitigate that would be cool. > > > > > > Hm, can't think of anything less hacky than "prohibit having fields of > > type > > > PCollectionView that are not public, final, and annotated with > > @SideInput" > > > - not sure we'd want to go down this road. I suppose a good error > message > > > in .get() would be sufficient, saying "Did you forget to specify a > > > requirement for this side input via .withSideInputs() or by annotating > > the > > > field as @SideInput" or something like that. > > > > > > > > > > > > > > > > > Kenn > > > > > > > > > > > > > > > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik > <lc...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > > > My concern with the proposal is not the specifics of how it will > > work > > > > and > > > > > > more about it being yet another way on how our API is to be used > > even > > > > > > though we have a proposal [1] of an API style we were working > > towards > > > > in > > > > > > Java and Python. I would rather re-open that discussion now about > > > what > > > > we > > > > > > want that API to look like for our major features and work > towards > > > > > > consistency (or not if there is a strong argument as to why some > > > > feature > > > > > > should have a different style). > > > > > > > > > > > > 1: https://s.apache.org/a-new-dofn > > > > > > > > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > > > > <k...@google.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > +0.75 because I'd like to bring up invalid pipelines. > > > > > > > > > > > > > > I had proposed side inputs as parameters to DoFn in > > > > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so the > > only > > > > > place > > > > > > > they are specified is in the graph construction, making the > DoFn > > > more > > > > > > > reusable and errors impossible. I've actually been noodling my > > way > > > > > > towards > > > > > > > this in a branch :-) > > > > > > > > > > > > > > Eugene's proposal is a sort of converse, where the side inputs > > are > > > > > values > > > > > > > captured in the closure and not parameters, yet the only place > > they > > > > are > > > > > > > specified is in the DoFn. > > > > > > > > > > > > > > I see no conflict between these two. It is very natural to have > > > both > > > > > the > > > > > > > capability to accept parameters and the ability to capture > > > variables > > > > in > > > > > > the > > > > > > > closure. Supporting both is totally standard in up-to-date > > > > programming > > > > > > > languages. > > > > > > > > > > > > > > Today we have the worse of both worlds: PCollectionView behaves > > as > > > > > > > something captured in the closure/constructor, but must still > be > > > > > > explicitly > > > > > > > wired up. > > > > > > > > > > > > > > But if I use PCollectionView.get() and have not wired it up in > > any > > > > way, > > > > > > > what happens? Just like today, you can try to .sideInput(...) a > > > thing > > > > > > that > > > > > > > is not available. With side inputs as parameters, this is not > > > > possible. > > > > > > If > > > > > > > you want to treat them as captured in a closure, while avoiding > > > > errors, > > > > > > it > > > > > > > seems like you might need to do some low-level magic, like the > > > > > > > serialization-based detection that Luke has suggested before > > (there > > > > are > > > > > > > known downsides that we haven't explored, like overcapture). > > > > > > > > > > > > > > Kenn > > > > > > > > > > > > > > [1] > > > > > > > > > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > > Hm, I guess you're right - for outputs it could be indeed > quite > > > > > > valuable > > > > > > > to > > > > > > > > output to them without plumbing (e.g. outputting errors). > Could > > > be > > > > > done > > > > > > > > perhaps via TupleTag.output()? (assuming the same TupleTag > can > > > not > > > > be > > > > > > > > reused to tag multiple PCollection's) > > > > > > > > > > > > > > > > For now I sent a PR for side input support > > > > > > > > https://github.com/apache/beam/pull/3814 . > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > I disagree, state may not care where it is used as well > > since a > > > > > > person > > > > > > > > may > > > > > > > > > call a function which needs to store/retrieve state and > > instead > > > > of > > > > > > > having > > > > > > > > > the DoFn declare the StateSpec and then pass in the state > > > > > > > implementation > > > > > > > > > down into the function everywhere. Similarly for outputs, > the > > > > > > internal > > > > > > > > > functions could take the TupleTag and request an output > > manager > > > > or > > > > > > take > > > > > > > > an > > > > > > > > > "output" reference which give functions the ability to > > produce > > > > > output > > > > > > > > > directly without needing to pass everything that is needed > to > > > be > > > > > > output > > > > > > > > > back to the caller. > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > > > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > Hm, I think of these things (state, side outputs etc.), > > only > > > > side > > > > > > > > inputs > > > > > > > > > > make sense to access in arbitrary user callbacks without > > > > explicit > > > > > > > > > knowledge > > > > > > > > > > of the surrounding transform - so only side inputs can be > > > > > implicit > > > > > > > like > > > > > > > > > > this. > > > > > > > > > > > > > > > > > > > > Ultimately we'll probably end up removing ProcessContext, > > and > > > > > > keeping > > > > > > > > > only > > > > > > > > > > annotations (on fields / methods / parameters). In that > > > world, > > > > a > > > > > > > field > > > > > > > > > > annotation could be used (like per my previous email) to > > > > > statically > > > > > > > > > specify > > > > > > > > > > which side inputs will be needed - while the value could > > > still > > > > be > > > > > > > > > accessed > > > > > > > > > > via .get(), just like state cells are accessed via > .read() > > > and > > > > > > > > .write(): > > > > > > > > > > i.e., #get() is not a new method of access. > > > > > > > > > > > > > > > > > > > > Overall, it seems like I should proceed with the idea. I > > > filed > > > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > > > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > For API consistency reasons, it would be good if we did > > > this > > > > > > > > > holistically > > > > > > > > > > > and expanded this approach to state, side outputs, ... > so > > > > that > > > > > a > > > > > > > > person > > > > > > > > > > can > > > > > > > > > > > always call Something.get() to return something that > they > > > can > > > > > > > access > > > > > > > > > > > implementation wise. It will be confusing for our users > > to > > > > have > > > > > > > many > > > > > > > > > > > variations in our style of how all these concepts are > > used > > > > > > > > > > (ProcessContext > > > > > > > > > > > / Annotations / #get()) > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > > > > > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > Also, I think my approach is compatible with > > annotations > > > > and > > > > > > > future > > > > > > > > > > > removal > > > > > > > > > > > > of .withSideInputs if we annotate a field: > > > > > > > > > > > > final PCollectionView<Foo> foo = ...; > > > > > > > > > > > > > > > > > > > > > > > > class MyDoFn { > > > > > > > > > > > > @SideInput > > > > > > > > > > > > PCollectionView<Foo> foo = foo; > > > > > > > > > > > > > > > > > > > > > > > > ...foo.get()... > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > We can extract the accessed views from the DoFn > > instance > > > > > using > > > > > > > > > > > reflection. > > > > > > > > > > > > Still not compatible with lambdas, but compatible > > > > > automatically > > > > > > > > with > > > > > > > > > > all > > > > > > > > > > > > anonymous classes. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > > > > > > > > kirpic...@google.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > > > > > > > > > > > I know this (annotations) is the pattern we were > > > > > considering > > > > > > > for > > > > > > > > > side > > > > > > > > > > > > > inputs, but I no longer think it is the best way to > > > > access > > > > > > > them. > > > > > > > > > > > > > Annotations help getting rid of the > .withSideInputs() > > > > call, > > > > > > but > > > > > > > > > this > > > > > > > > > > is > > > > > > > > > > > > > where their advantage ends. > > > > > > > > > > > > > > > > > > > > > > > > > > The advantages of the proposed approach are that it > > > > > > > automatically > > > > > > > > > > works > > > > > > > > > > > > > with all existing callback or lambda code. No need > to > > > > > further > > > > > > > > > develop > > > > > > > > > > > the > > > > > > > > > > > > > reflection machinery to support side input > > annotations > > > - > > > > > and > > > > > > > > > > especially > > > > > > > > > > > > to > > > > > > > > > > > > > support arbitrary user interfaces, no need to > change > > > > > existing > > > > > > > > > > > transforms, > > > > > > > > > > > > > no need for transform authors to even know that the > > > > > machinery > > > > > > > > > exists > > > > > > > > > > to > > > > > > > > > > > > > make side inputs usable in their transforms (and no > > > need > > > > > for > > > > > > > > > authors > > > > > > > > > > to > > > > > > > > > > > > > think about whether or not they should support side > > > > > inputs). > > > > > > > > > > > > > > > > > > > > > > > > > > Moreover, like Reuven says, annotations don't work > > with > > > > > > lambdas > > > > > > > > at > > > > > > > > > > all: > > > > > > > > > > > > > creating a lambda with a flexible set of annotation > > > > > arguments > > > > > > > > > appears > > > > > > > > > > > to > > > > > > > > > > > > be > > > > > > > > > > > > > currently impossible, and even capturing the > > > annotations > > > > on > > > > > > > > > arguments > > > > > > > > > > > of > > > > > > > > > > > > a > > > > > > > > > > > > > lambda is I believe also impossible because the > Java > > > > > compiler > > > > > > > > drops > > > > > > > > > > > them > > > > > > > > > > > > in > > > > > > > > > > > > > the generated class or method handle. > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > > > > > > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > >> I believe we should follow the pattern that state > > uses > > > > and > > > > > > > add a > > > > > > > > > > type > > > > > > > > > > > > >> annotation to link the side input definition to > its > > > > usage > > > > > > > > > directly. > > > > > > > > > > > This > > > > > > > > > > > > >> would allow us to know that the side input was > > > > definitely > > > > > > > being > > > > > > > > > > > accessed > > > > > > > > > > > > >> and perform validation during graph construction > for > > > any > > > > > > used > > > > > > > > but > > > > > > > > > > > > >> unspecified side inputs. > > > > > > > > > > > > >> > > > > > > > > > > > > >> Code snippet: > > > > > > > > > > > > >> final PCollectionView<String> foo = > > > > > > pipeline.apply("fooName", > > > > > > > > > > > > >> Create.of("foo")).apply(View.< > String>asSingleton()); > > > > > > > > > > > > >> PCollection<String> output = pipeline > > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > > > > > > > > >> .apply(MapElements.via( > > > > > > > > > > > > >> new SimpleFunction<Integer, String>() { > > > > > > > > > > > > >> @Override > > > > > > > > > > > > >> public String apply(Integer input, > > > > > > > > @SideInput("fooName") > > > > > > > > > > > > String > > > > > > > > > > > > >> fooValue) { > > > > > > > > > > > > >> return fooValue + " " + input; > > > > > > > > > > > > >> } > > > > > > > > > > > > >> }).withSideInputs(foo));* > > > > > > > > > > > > >> > > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > > > > > > > > > > > >> kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Sure, here's how a modified (passing) > MapElements > > > unit > > > > > > test > > > > > > > > > looks > > > > > > > > > > > > like, > > > > > > > > > > > > >> > with usage of side inputs: > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > @Test > > > > > > > > > > > > >> > @Category(NeedsRunner.class) > > > > > > > > > > > > >> > public void testMapBasicWithSideInput() throws > > > > > > Exception { > > > > > > > > > > > > >> > * final PCollectionView<String> foo =* > > > > > > > > > > > > >> > * pipeline.apply("foo", > > > > > > > > > > > > >> > > > > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > > > > > > > > >> > PCollection<String> output = pipeline > > > > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > > > > > > > > >> > .apply(MapElements.via( > > > > > > > > > > > > >> > new SimpleFunction<Integer, > String>() > > { > > > > > > > > > > > > >> > @Override > > > > > > > > > > > > >> > public String apply(Integer > input) { > > > > > > > > > > > > >> > return* foo.get() *+ " " + > input; > > > > > > > > > > > > >> > } > > > > > > > > > > > > >> > }) > > > > > > > > > > > > >> > *.withSideInputs(foo));* > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > PAssert.that(output). > containsInAnyOrder("foo > > 1", > > > > > "foo > > > > > > > 2", > > > > > > > > > > "foo > > > > > > > > > > > > 3"); > > > > > > > > > > > > >> > pipeline.run(); > > > > > > > > > > > > >> > } > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > > > > > > > > > > <re...@google.com.invalid > > > > > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Can you provide a code snippet showing how > this > > > > would > > > > > > > look? > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene > > Kirpichov < > > > > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > TL;DR Introduce method > PCollectionView.get(), > > > > > > > implemented > > > > > > > > > as: > > > > > > > > > > > get > > > > > > > > > > > > >> > > > thread-local ProcessContext and do > > > > > c.sideInput(this). > > > > > > > As a > > > > > > > > > > > result, > > > > > > > > > > > > >> any > > > > > > > > > > > > >> > > user > > > > > > > > > > > > >> > > > lambdas such as MapElements can use side > > inputs. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Quite a few transforms have user-code > > callbacks > > > or > > > > > > > > lambdas: > > > > > > > > > > > ParDo > > > > > > > > > > > > >> > (DoFn), > > > > > > > > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations > > > > classes > > > > > > in > > > > > > > > > > various > > > > > > > > > > > > IOs, > > > > > > > > > > > > >> > > > combine fns, the PollFn callback in Watch, > > etc. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have > > built-in > > > > > > support > > > > > > > > for > > > > > > > > > > side > > > > > > > > > > > > >> > inputs; > > > > > > > > > > > > >> > > > for DynamicDestinations it is plumbed > > > explicitly; > > > > > > others > > > > > > > > > don't > > > > > > > > > > > > have > > > > > > > > > > > > >> > > access > > > > > > > > > > > > >> > > > (e.g. you can't access side inputs from > > > > > > > > Map/FlatMapElements > > > > > > > > > > > > because > > > > > > > > > > > > >> > they > > > > > > > > > > > > >> > > > don't have a ProcessContext or any context > for > > > > that > > > > > > > > matter). > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > I think it's important to solve this, > > especially > > > > as > > > > > > > Java 8 > > > > > > > > > > > becomes > > > > > > > > > > > > >> > > people's > > > > > > > > > > > > >> > > > default choice: users will want to use side > > > inputs > > > > > in > > > > > > > > > > > > >> > > Map/FlatMapElements. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > It also appears to be quite easy to > implement: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Runner part: > > > > > > > > > > > > >> > > > - introduce a SideInputAccessor interface > > > > > > > > > > > > >> > > > - make .get() on a PCollectionView get it > > from a > > > > > > > > > thread-local > > > > > > > > > > > > >> > > > SideInputAccessor > > > > > > > > > > > > >> > > > - make runners set the thread-local > > > > > SideInputAccessor > > > > > > > any > > > > > > > > > time > > > > > > > > > > > the > > > > > > > > > > > > >> > runner > > > > > > > > > > > > >> > > > is evaluating something in a context where > > side > > > > > inputs > > > > > > > are > > > > > > > > > > > > >> available, > > > > > > > > > > > > >> > > e.g. > > > > > > > > > > > > >> > > > a ProcessElement method, or applying a > > > CombineFn - > > > > > the > > > > > > > set > > > > > > > > > of > > > > > > > > > > > such > > > > > > > > > > > > >> > places > > > > > > > > > > > > >> > > > will be quite small. I believe only runners > > (but > > > > not > > > > > > > > > > transforms) > > > > > > > > > > > > >> will > > > > > > > > > > > > >> > > ever > > > > > > > > > > > > >> > > > need to set this thread-local > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Transform part: > > > > > > > > > > > > >> > > > - Transforms that take user-code lambdas and > > > want > > > > to > > > > > > let > > > > > > > > > them > > > > > > > > > > > > access > > > > > > > > > > > > >> > side > > > > > > > > > > > > >> > > > inputs still will need to be configurable > > with a > > > > > > method > > > > > > > > like > > > > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and will > need > > > to > > > > > > plumb > > > > > > > > > those > > > > > > > > > > > down > > > > > > > > > > > > >> to > > > > > > > > > > > > >> > the > > > > > > > > > > > > >> > > > primitive DoFn's or CombineFn's - > information > > on > > > > > > *which* > > > > > > > > > side > > > > > > > > > > > > inputs > > > > > > > > > > > > >> > will > > > > > > > > > > > > >> > > > be accessed, of course, still needs to be in > > the > > > > > > graph. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > I quickly prototyped this in direct runner > and > > > > > > > > MapElements, > > > > > > > > > > and > > > > > > > > > > > it > > > > > > > > > > > > >> > worked > > > > > > > > > > > > >> > > > with no issues - I'm wondering if there's > > > > something > > > > > > > subtle > > > > > > > > > > that > > > > > > > > > > > > I'm > > > > > > > > > > > > >> > > > missing, or is it actually a good idea? > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >