Generating the DoFnInvoker class takes enough time that it is important to
memoize them. The cache is keyed on the DoFn class. See
https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305

Kenn

On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> wrote:

> I’ve replicated this on the non-Portable FlinkRunner and DirectRunner.
>
> On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]>
> wrote:
>
> 
> Which runner are you using?
>
> On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote:
>
>> The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) does
>> generate casts to make sure that the elements match. I'm not entirely sure
>> offhand why the same DoFnInvoker is being used 0 seems like something might
>> be going wrong with DoFn caching.
>>
>> Reuven
>>
>> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote:
>>
>>> Hi all,
>>>
>>> I ran into sort of an interesting issue last night. Consider the code
>>> below. If you try to run it what will happen is you'll get a
>>> ClassCastException on the second Filter.by. What appears to be happening is
>>> that the Filter.by DoFnInvoker is being reused... which should be fine
>>> since that should be working with Object... but what I can't find is where
>>> the casting is happening because it seems like a) the cast isn't actually
>>> needed? and b) it's doing the wrong cast. Any clues?
>>>
>>> Best,
>>> B
>>>
>>> @Test
>>> public void testReusedLambda() {
>>>   p.apply(Create.of(new SimpleElement1()))
>>>       .apply("First", Filter.by(Objects::nonNull))
>>>       .apply(ParDo.of(new VerySimpleDoFn<>()))
>>>       .apply("Second", Filter.by(Objects::nonNull));
>>>   p.run().waitUntilFinish();
>>> }
>>>
>>> static class SimpleElement1 implements Serializable {}
>>>
>>> static class SimpleElement2 implements Serializable {}
>>>
>>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> {
>>>   @ProcessElement
>>>   public void processElement(ProcessContext c) {
>>>     c.output(new SimpleElement2());
>>>   }
>>> }
>>>
>>>
>>>
>>>

Reply via email to