We still need to generate the bytecode - using reflection to invoke the methods was very slow. Serializability with dynamic bytecode generation is a tricky thing to deal with. There is another approach where instead of generating bytecode one generates source code (or ship actual .class files), but that's not the approach we took. BTW Beam is different than Flink in that we need to generate a custom invoker class for each DoFn, since users can mark any method they want to be their processing elements.
That being said, I think this is simply a bug here. On Fri, Jan 9, 2026 at 1:07 AM Jan Lukavský <[email protected]> wrote: > Is it possible to generate DoFnInvoker only during pipeline construction > time? Flink generally uses approach that all user code is Serializable, the > serialized bytes are distributed to workers and instances of UDFs are > created only by deserialization. This would require DoFnInvoker(s) (and all > related classes, like OnTimerInvoker) to be Serializable, but then this > should work or is there anything special that I'm missing? > On 1/7/26 02:17, Reuven Lax via dev wrote: > > How would we do this? Caching a map of DoFn -> Invoker is effectively > doing it once per DoFn, no? > > On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> wrote: > >> I was thinking more in the sense of doing once per dofn rather than >> using a globally memoized cache >> >> On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote: >> >>> Given how expensive it was in the past (multiple seconds to generate the >>> bytecode!), I suspect we still don't want to do this on every >>> element processing. >>> >>> On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis <[email protected]> >>> wrote: >>> >>>> I suspect the cache also needs to include the cast target (or perhaps a >>>> re-examination of "how expensive is this really in the year 2025?"). >>>> Fortunately there's a pretty easy workaround. >>>> >>>> On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles <[email protected]> wrote: >>>> >>>>> Generating the DoFnInvoker class takes enough time that it is >>>>> important to memoize them. The cache is keyed on the DoFn class. See >>>>> https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305 >>>>> >>>>> Kenn >>>>> >>>>> On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> >>>>> wrote: >>>>> >>>>>> I’ve replicated this on the non-Portable FlinkRunner and >>>>>> DirectRunner. >>>>>> >>>>>> On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]> >>>>>> wrote: >>>>>> >>>>>> >>>>>> Which runner are you using? >>>>>> >>>>>> On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) >>>>>>> does generate casts to make sure that the elements match. I'm not >>>>>>> entirely >>>>>>> sure offhand why the same DoFnInvoker is being used 0 seems like >>>>>>> something >>>>>>> might be going wrong with DoFn caching. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I ran into sort of an interesting issue last night. Consider the >>>>>>>> code below. If you try to run it what will happen is you'll get a >>>>>>>> ClassCastException on the second Filter.by. What appears to be >>>>>>>> happening is >>>>>>>> that the Filter.by DoFnInvoker is being reused... which should be fine >>>>>>>> since that should be working with Object... but what I can't find is >>>>>>>> where >>>>>>>> the casting is happening because it seems like a) the cast isn't >>>>>>>> actually >>>>>>>> needed? and b) it's doing the wrong cast. Any clues? >>>>>>>> >>>>>>>> Best, >>>>>>>> B >>>>>>>> >>>>>>>> @Testpublic void testReusedLambda() { >>>>>>>> p.apply(Create.of(new SimpleElement1())) >>>>>>>> .apply("First", Filter.by(Objects::nonNull)) >>>>>>>> .apply(ParDo.of(new VerySimpleDoFn<>())) >>>>>>>> .apply("Second", Filter.by(Objects::nonNull)); >>>>>>>> p.run().waitUntilFinish(); >>>>>>>> } >>>>>>>> static class SimpleElement1 implements Serializable {} >>>>>>>> static class SimpleElement2 implements Serializable {} >>>>>>>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> { >>>>>>>> @ProcessElement public void processElement(ProcessContext c) { >>>>>>>> c.output(new SimpleElement2()); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>> >>>> -- >>>> Byron Ellis ([email protected]) >>>> "Oook" -- The Librarian >>>> >>> >> >> -- >> Byron Ellis ([email protected]) >> "Oook" -- The Librarian >> >
