I suspect the cache also needs to include the cast target (or perhaps a re-examination of "how expensive is this really in the year 2025?"). Fortunately there's a pretty easy workaround.
On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles <[email protected]> wrote: > Generating the DoFnInvoker class takes enough time that it is important to > memoize them. The cache is keyed on the DoFn class. See > https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305 > > Kenn > > On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> wrote: > >> I’ve replicated this on the non-Portable FlinkRunner and DirectRunner. >> >> On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]> >> wrote: >> >> >> Which runner are you using? >> >> On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote: >> >>> The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) does >>> generate casts to make sure that the elements match. I'm not entirely sure >>> offhand why the same DoFnInvoker is being used 0 seems like something might >>> be going wrong with DoFn caching. >>> >>> Reuven >>> >>> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote: >>> >>>> Hi all, >>>> >>>> I ran into sort of an interesting issue last night. Consider the code >>>> below. If you try to run it what will happen is you'll get a >>>> ClassCastException on the second Filter.by. What appears to be happening is >>>> that the Filter.by DoFnInvoker is being reused... which should be fine >>>> since that should be working with Object... but what I can't find is where >>>> the casting is happening because it seems like a) the cast isn't actually >>>> needed? and b) it's doing the wrong cast. Any clues? >>>> >>>> Best, >>>> B >>>> >>>> @Test >>>> public void testReusedLambda() { >>>> p.apply(Create.of(new SimpleElement1())) >>>> .apply("First", Filter.by(Objects::nonNull)) >>>> .apply(ParDo.of(new VerySimpleDoFn<>())) >>>> .apply("Second", Filter.by(Objects::nonNull)); >>>> p.run().waitUntilFinish(); >>>> } >>>> >>>> static class SimpleElement1 implements Serializable {} >>>> >>>> static class SimpleElement2 implements Serializable {} >>>> >>>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> { >>>> @ProcessElement >>>> public void processElement(ProcessContext c) { >>>> c.output(new SimpleElement2()); >>>> } >>>> } >>>> >>>> >>>> >>>> -- Byron Ellis ([email protected]) "Oook" -- The Librarian
