This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 808d23877bd Fix DoFnInvoker cache collision for generic types (#37355)
808d23877bd is described below

commit 808d23877bd82261e1c2f82b770b6dcd84645a6a
Author: Elia LIU <[email protected]>
AuthorDate: Thu Feb 19 04:30:06 2026 +1100

    Fix DoFnInvoker cache collision for generic types (#37355)
    
    This fixes a bug where ByteBuddyDoFnInvokerFactory would return the same 
cached invoker for different generic instantiations of the same DoFn class.
    
    Changes:
    1. Introduced InvokerCacheKey with TypeDescriptors to ensure unique cache 
entries.
    2. Updated generateInvokerClass to append type-based hash suffix.
    3. Added regression test (testCacheKeyCollisionProof).
    
    This PR fixes a critical issue where ByteBuddyDoFnInvokerFactory failed to 
distinguish between different generic instantiations of the same DoFn class 
(e.g., MyFn<String> vs MyFn<Integer>).
    
    1. Cache Key Strategy: Introduced InvokerCacheKey to include input/output 
TypeDescriptors in the cache lookup.
    2. Class Naming: Updated generateInvokerClass to append a type-based hash 
suffix to ensure unique class names.
    3. Robustness (The Fix): Added defensive try-catch blocks when accessing 
TypeDescriptors.
       - Some internal transforms (like MapElements) throw 
IllegalStateException if getOutputTypeDescriptor() is called after 
serialization.
       - In these cases, the factory now gracefully falls back to using 
Object.class (legacy behavior), ensuring backward compatibility for transforms 
that do not retain type information at runtime.
---
 .github/trigger_files/beam_PostCommit_Java.json    |   2 +-
 ...m_PostCommit_Java_ValidatesRunner_Dataflow.json |   1 +
 ...eam_PostCommit_Java_ValidatesRunner_Direct.json |   1 +
 ...beam_PostCommit_Java_ValidatesRunner_Flink.json |   1 +
 .github/trigger_files/beam_PostCommit_Python.json  |   4 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java       | 141 ++++++++++++++++++---
 .../sdk/transforms/reflect/DoFnInvokersTest.java   |  53 +++++++-
 7 files changed, 181 insertions(+), 22 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Java.json 
b/.github/trigger_files/beam_PostCommit_Java.json
index 1bd74515152..756b765e59e 100644
--- a/.github/trigger_files/beam_PostCommit_Java.json
+++ b/.github/trigger_files/beam_PostCommit_Java.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
   "modification": 4
-}
\ No newline at end of file
+} 
diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
index 39523ea7c0f..a89f7adb4ce 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
@@ -2,3 +2,4 @@
   "comment": "Modify this file in a trivial way to cause this test suite to 
run!",
   "modification": 3,
 }
+ 
diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
index 7e7462c0b05..31caa31981e 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
@@ -6,3 +6,4 @@
   "https://github.com/apache/beam/pull/31761": "noting that PR #31761 should 
run this test",
   "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface"
 }
+ 
diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
index afda4087adf..55a37245900 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
@@ -7,3 +7,4 @@
   "runFor": "#33606",
   "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface"
 }
+ 
diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 7a434069980..5d0598c952f 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
   "pr": "36271",
-  "modification": 38
-}
+  "modification": 39
+} 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 3afd8aeb5e9..6a2b7fe5f1f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.description.field.FieldDescription;
@@ -106,6 +107,7 @@ import 
org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -166,15 +168,66 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
   private static final String FN_DELEGATE_FIELD_NAME = "delegate";
 
   /**
-   * A cache of constructors of generated {@link DoFnInvoker} classes, keyed 
by {@link DoFn} class.
-   * Needed because generating an invoker class is expensive, and to avoid 
generating an excessive
-   * number of classes consuming PermGen memory.
+   * Cache key for DoFnInvoker constructors that includes both the DoFn class 
and its generic type
+   * parameters to prevent collisions when the same DoFn class is used with 
different generic types.
+   */
+  private static final class InvokerCacheKey {
+    private final Class<? extends DoFn<?, ?>> fnClass;
+    private final TypeDescriptor<?> inputType;
+    private final TypeDescriptor<?> outputType;
+
+    InvokerCacheKey(
+        Class<? extends DoFn<?, ?>> fnClass,
+        TypeDescriptor<?> inputType,
+        TypeDescriptor<?> outputType) {
+      this.fnClass = fnClass;
+      this.inputType = inputType;
+      this.outputType = outputType;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof InvokerCacheKey)) {
+        return false;
+      }
+      InvokerCacheKey that = (InvokerCacheKey) o;
+      return Objects.equals(fnClass, that.fnClass)
+          && Objects.equals(inputType, that.inputType)
+          && Objects.equals(outputType, that.outputType);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(fnClass, inputType, outputType);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("fnClass", fnClass.getName())
+          .add("inputType", inputType)
+          .add("outputType", outputType)
+          .toString();
+    }
+  }
+
+  /**
+   * A cache of constructors of generated {@link DoFnInvoker} classes, keyed 
by {@link DoFn} class
+   * and its generic type parameters. Needed because generating an invoker 
class is expensive, and
+   * to avoid generating an excessive number of classes consuming PermGen 
memory.
+   *
+   * <p>The cache key includes generic type information to prevent collisions 
when the same DoFn
+   * class is used with different generic types (e.g., MyDoFn&lt;String&gt; vs
+   * MyDoFn&lt;Integer&gt;).
    *
    * <p>Note that special care must be taken to enumerate this object as 
concurrent hash maps are <a
    * 
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
    * consistent</a>.
    */
-  private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache 
=
+  private final Map<InvokerCacheKey, Constructor<?>> 
byteBuddyInvokerConstructorCache =
       new ConcurrentHashMap<>();
 
   private ByteBuddyDoFnInvokerFactory() {}
@@ -265,11 +318,39 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
         signature.fnClass(),
         fn.getClass());
 
+    // Extract input and output type descriptors to distinguish generic 
instantiations.
+    // Fall back to Object.class if unavailable. When type info is lost, 
different generic
+    // instantiations share an invoker, which is acceptable since the DoFn 
class in the cache
+    // key prevents collisions between different DoFn classes.
+    TypeDescriptor<InputT> inputType;
+    try {
+      inputType = fn.getInputTypeDescriptor();
+    } catch (Exception e) {
+      // Some DoFns (like MapElements) throw IllegalStateException if queried 
after
+      // serialization.
+      // In this case, we fall back to the raw class behavior (Object).
+      inputType = null;
+    }
+    if (inputType == null) {
+      inputType = (TypeDescriptor<InputT>) TypeDescriptor.of(Object.class);
+    }
+
+    TypeDescriptor<OutputT> outputType;
+    try {
+      outputType = fn.getOutputTypeDescriptor();
+    } catch (Exception e) {
+      // Same as above: fall back to Object if type info is unavailable.
+      outputType = null;
+    }
+    if (outputType == null) {
+      outputType = (TypeDescriptor<OutputT>) TypeDescriptor.of(Object.class);
+    }
+
     try {
       @SuppressWarnings("unchecked")
       DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
           (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
-              getByteBuddyInvokerConstructor(signature).newInstance(fn);
+              getByteBuddyInvokerConstructor(signature, inputType, 
outputType).newInstance(fn);
 
       if (signature.onTimerMethods() != null) {
         for (OnTimerMethod onTimerMethod : 
signature.onTimerMethods().values()) {
@@ -297,19 +378,24 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
   }
 
   /**
-   * Returns a generated constructor for a {@link DoFnInvoker} for the given 
{@link DoFn} class.
+   * Returns a generated constructor for a {@link DoFnInvoker} for the given 
{@link DoFnSignature}
+   * and specific generic types.
    *
    * <p>These are cached such that at most one {@link DoFnInvoker} class 
exists for a given {@link
-   * DoFn} class.
+   * DoFn} class with specific generic type parameters. Different generic 
instantiations of the same
+   * DoFn class will have separate cached invoker classes.
    */
-  private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature 
signature) {
+  private Constructor<?> getByteBuddyInvokerConstructor(
+      DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?> 
outputType) {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+    InvokerCacheKey cacheKey = new InvokerCacheKey(fnClass, inputType, 
outputType);
     return byteBuddyInvokerConstructorCache.computeIfAbsent(
-        fnClass,
-        clazz -> {
-          Class<? extends DoFnInvoker<?, ?>> invokerClass = 
generateInvokerClass(signature);
+        cacheKey,
+        key -> {
+          Class<? extends DoFnInvoker<?, ?>> invokerClass =
+              generateInvokerClass(signature, inputType, outputType);
           try {
-            return invokerClass.getConstructor(clazz);
+            return invokerClass.getConstructor(fnClass);
           } catch (IllegalArgumentException | NoSuchMethodException | 
SecurityException e) {
             throw new RuntimeException(e);
           }
@@ -456,19 +542,42 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
     }
   }
 
+  /**
+   * Generates a type suffix string for use in invoker class names.
+   *
+   * <p>This creates a unique suffix based on the input and output type 
descriptors to avoid class
+   * name collisions when the same DoFn class is used with different generic 
types.
+   *
+   * <p>The format is: {@code DoFnInvoker$<8-digit hex hash>}
+   *
+   * @param inputType the input type descriptor
+   * @param outputType the output type descriptor
+   * @return a string suffix for the invoker class name
+   */
+  public static String generateTypeSuffix(
+      TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) {
+    return String.format(
+        "%s$%08x",
+        DoFnInvoker.class.getSimpleName(),
+        (inputType.toString() + "|" + outputType.toString()).hashCode());
+  }
+
   /** Generates a {@link DoFnInvoker} class for the given {@link 
DoFnSignature}. */
-  private static Class<? extends DoFnInvoker<?, ?>> 
generateInvokerClass(DoFnSignature signature) {
+  private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(
+      DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?> 
outputType) {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
 
+    // Create a unique suffix based on the type descriptors to avoid class 
name collisions
+    // when the same DoFn class is used with different generic types.
+    String typeSuffix = generateTypeSuffix(inputType, outputType);
+
     final TypeDescription clazzDescription = new 
TypeDescription.ForLoadedType(fnClass);
 
     DynamicType.Builder<?> builder =
         new ByteBuddy()
             // Create subclasses inside the target class, to have access to
             // private and package-private bits
-            .with(
-                StableInvokerNamingStrategy.forDoFnClass(fnClass)
-                    .withSuffix(DoFnInvoker.class.getSimpleName()))
+            
.with(StableInvokerNamingStrategy.forDoFnClass(fnClass).withSuffix(typeSuffix))
 
             // class <invoker class> extends DoFnInvokerBase {
             .subclass(DoFnInvokerBase.class, 
ConstructorStrategy.Default.NO_CONSTRUCTORS)
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 299c5d5c590..186d58e3318 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
@@ -77,6 +78,8 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.OutputBuilder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -1382,11 +1385,14 @@ public class DoFnInvokersTest {
   @Test
   public void testStableName() {
     DoFnInvoker<Void, Void> invoker = DoFnInvokers.invokerFor(new 
StableNameTestDoFn());
+    // The invoker class name includes a hash of the type descriptors to 
support
+    // different generic instantiations of the same DoFn class.
+    // Format: <DoFn class name>$<DoFnInvoker>$<type hash>
+    TypeDescriptor<Void> voidType = new 
StableNameTestDoFn().getInputTypeDescriptor();
+    String expectedTypeSuffix = 
ByteBuddyDoFnInvokerFactory.generateTypeSuffix(voidType, voidType);
     assertThat(
         invoker.getClass().getName(),
-        equalTo(
-            String.format(
-                "%s$%s", StableNameTestDoFn.class.getName(), 
DoFnInvoker.class.getSimpleName())));
+        equalTo(String.format("%s$%s", StableNameTestDoFn.class.getName(), 
expectedTypeSuffix)));
   }
 
   @Test
@@ -1406,4 +1412,45 @@ public class DoFnInvokersTest {
 
     
verify(mockBundleFinalizer).afterBundleCommit(eq(Instant.ofEpochSecond(42L)), 
eq(null));
   }
+
+  @Test
+  public void testCacheKeyCollisionProof() throws Exception {
+    class DynamicTypeDoFn<T> extends DoFn<T, T> {
+      private final TypeDescriptor<T> typeDescriptor;
+
+      DynamicTypeDoFn(TypeDescriptor<T> typeDescriptor) {
+        this.typeDescriptor = typeDescriptor;
+      }
+
+      @ProcessElement
+      public void processElement(@Element T element, OutputReceiver<T> out) {
+        out.output(element);
+      }
+
+      // Key point: force returning our specified type instead of relying on 
class signature
+      @Override
+      public TypeDescriptor<T> getInputTypeDescriptor() {
+        return typeDescriptor;
+      }
+
+      @Override
+      public TypeDescriptor<T> getOutputTypeDescriptor() {
+        return typeDescriptor;
+      }
+    }
+
+    DoFn<String, String> stringFn = new 
DynamicTypeDoFn<>(TypeDescriptors.strings());
+    DoFn<Integer, Integer> intFn = new 
DynamicTypeDoFn<>(TypeDescriptors.integers());
+
+    DoFnInvoker<String, String> stringInvoker = 
DoFnInvokers.invokerFor(stringFn);
+    DoFnInvoker<Integer, Integer> intInvoker = DoFnInvokers.invokerFor(intFn);
+
+    System.out.println("String Invoker: " + 
stringInvoker.getClass().getName());
+    System.out.println("Integer Invoker: " + intInvoker.getClass().getName());
+
+    assertNotSame(
+        "Critical bug: Beam returned the same cached class for different 
generic types.",
+        stringInvoker.getClass(),
+        intInvoker.getClass());
+  }
 }

Reply via email to