Generating the DoFnInvoker class takes enough time that it is important to memoize them. The cache is keyed on the DoFn class. See https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305
Kenn On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> wrote: > I’ve replicated this on the non-Portable FlinkRunner and DirectRunner. > > On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]> > wrote: > > > Which runner are you using? > > On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote: > >> The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) does >> generate casts to make sure that the elements match. I'm not entirely sure >> offhand why the same DoFnInvoker is being used 0 seems like something might >> be going wrong with DoFn caching. >> >> Reuven >> >> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote: >> >>> Hi all, >>> >>> I ran into sort of an interesting issue last night. Consider the code >>> below. If you try to run it what will happen is you'll get a >>> ClassCastException on the second Filter.by. What appears to be happening is >>> that the Filter.by DoFnInvoker is being reused... which should be fine >>> since that should be working with Object... but what I can't find is where >>> the casting is happening because it seems like a) the cast isn't actually >>> needed? and b) it's doing the wrong cast. Any clues? >>> >>> Best, >>> B >>> >>> @Test >>> public void testReusedLambda() { >>> p.apply(Create.of(new SimpleElement1())) >>> .apply("First", Filter.by(Objects::nonNull)) >>> .apply(ParDo.of(new VerySimpleDoFn<>())) >>> .apply("Second", Filter.by(Objects::nonNull)); >>> p.run().waitUntilFinish(); >>> } >>> >>> static class SimpleElement1 implements Serializable {} >>> >>> static class SimpleElement2 implements Serializable {} >>> >>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> { >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> c.output(new SimpleElement2()); >>> } >>> } >>> >>> >>> >>>
