This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 808d23877bd Fix DoFnInvoker cache collision for generic types (#37355)
808d23877bd is described below
commit 808d23877bd82261e1c2f82b770b6dcd84645a6a
Author: Elia LIU <[email protected]>
AuthorDate: Thu Feb 19 04:30:06 2026 +1100
Fix DoFnInvoker cache collision for generic types (#37355)
This fixes a bug where ByteBuddyDoFnInvokerFactory would return the same
cached invoker for different generic instantiations of the same DoFn class.
Changes:
1. Introduced InvokerCacheKey with TypeDescriptors to ensure unique cache
entries.
2. Updated generateInvokerClass to append type-based hash suffix.
3. Added regression test (testCacheKeyCollisionProof).
This PR fixes a critical issue where ByteBuddyDoFnInvokerFactory failed to
distinguish between different generic instantiations of the same DoFn class
(e.g., MyFn<String> vs MyFn<Integer>).
1. Cache Key Strategy: Introduced InvokerCacheKey to include input/output
TypeDescriptors in the cache lookup.
2. Class Naming: Updated generateInvokerClass to append a type-based hash
suffix to ensure unique class names.
3. Robustness (The Fix): Added defensive try-catch blocks when accessing
TypeDescriptors.
- Some internal transforms (like MapElements) throw
IllegalStateException if getOutputTypeDescriptor() is called after
serialization.
- In these cases, the factory now gracefully falls back to using
Object.class (legacy behavior), ensuring backward compatibility for transforms
that do not retain type information at runtime.
---
.github/trigger_files/beam_PostCommit_Java.json | 2 +-
...m_PostCommit_Java_ValidatesRunner_Dataflow.json | 1 +
...eam_PostCommit_Java_ValidatesRunner_Direct.json | 1 +
...beam_PostCommit_Java_ValidatesRunner_Flink.json | 1 +
.github/trigger_files/beam_PostCommit_Python.json | 4 +-
.../reflect/ByteBuddyDoFnInvokerFactory.java | 141 ++++++++++++++++++---
.../sdk/transforms/reflect/DoFnInvokersTest.java | 53 +++++++-
7 files changed, 181 insertions(+), 22 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Java.json
b/.github/trigger_files/beam_PostCommit_Java.json
index 1bd74515152..756b765e59e 100644
--- a/.github/trigger_files/beam_PostCommit_Java.json
+++ b/.github/trigger_files/beam_PostCommit_Java.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
"modification": 4
-}
\ No newline at end of file
+}
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
index 39523ea7c0f..a89f7adb4ce 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
@@ -2,3 +2,4 @@
"comment": "Modify this file in a trivial way to cause this test suite to
run!",
"modification": 3,
}
+
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
index 7e7462c0b05..31caa31981e 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json
@@ -6,3 +6,4 @@
"https://github.com/apache/beam/pull/31761": "noting that PR #31761 should
run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and
making an interface"
}
+
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
index afda4087adf..55a37245900 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json
@@ -7,3 +7,4 @@
"runFor": "#33606",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and
making an interface"
}
+
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python.json
index 7a434069980..5d0598c952f 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
"pr": "36271",
- "modification": 38
-}
+ "modification": 39
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 3afd8aeb5e9..6a2b7fe5f1f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.field.FieldDescription;
@@ -106,6 +107,7 @@ import
org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -166,15 +168,66 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
private static final String FN_DELEGATE_FIELD_NAME = "delegate";
/**
- * A cache of constructors of generated {@link DoFnInvoker} classes, keyed
by {@link DoFn} class.
- * Needed because generating an invoker class is expensive, and to avoid
generating an excessive
- * number of classes consuming PermGen memory.
+ * Cache key for DoFnInvoker constructors that includes both the DoFn class
and its generic type
+ * parameters to prevent collisions when the same DoFn class is used with
different generic types.
+ */
+ private static final class InvokerCacheKey {
+ private final Class<? extends DoFn<?, ?>> fnClass;
+ private final TypeDescriptor<?> inputType;
+ private final TypeDescriptor<?> outputType;
+
+ InvokerCacheKey(
+ Class<? extends DoFn<?, ?>> fnClass,
+ TypeDescriptor<?> inputType,
+ TypeDescriptor<?> outputType) {
+ this.fnClass = fnClass;
+ this.inputType = inputType;
+ this.outputType = outputType;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof InvokerCacheKey)) {
+ return false;
+ }
+ InvokerCacheKey that = (InvokerCacheKey) o;
+ return Objects.equals(fnClass, that.fnClass)
+ && Objects.equals(inputType, that.inputType)
+ && Objects.equals(outputType, that.outputType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fnClass, inputType, outputType);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("fnClass", fnClass.getName())
+ .add("inputType", inputType)
+ .add("outputType", outputType)
+ .toString();
+ }
+ }
+
+ /**
+ * A cache of constructors of generated {@link DoFnInvoker} classes, keyed
by {@link DoFn} class
+ * and its generic type parameters. Needed because generating an invoker
class is expensive, and
+ * to avoid generating an excessive number of classes consuming PermGen
memory.
+ *
+ * <p>The cache key includes generic type information to prevent collisions
when the same DoFn
+ * class is used with different generic types (e.g., MyDoFn<String> vs
+ * MyDoFn<Integer>).
*
* <p>Note that special care must be taken to enumerate this object as
concurrent hash maps are <a
*
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
* consistent</a>.
*/
- private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache
=
+ private final Map<InvokerCacheKey, Constructor<?>>
byteBuddyInvokerConstructorCache =
new ConcurrentHashMap<>();
private ByteBuddyDoFnInvokerFactory() {}
@@ -265,11 +318,39 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
signature.fnClass(),
fn.getClass());
+ // Extract input and output type descriptors to distinguish generic
instantiations.
+ // Fall back to Object.class if unavailable. When type info is lost,
different generic
+ // instantiations share an invoker, which is acceptable since the DoFn
class in the cache
+ // key prevents collisions between different DoFn classes.
+ TypeDescriptor<InputT> inputType;
+ try {
+ inputType = fn.getInputTypeDescriptor();
+ } catch (Exception e) {
+ // Some DoFns (like MapElements) throw IllegalStateException if queried
after
+ // serialization.
+ // In this case, we fall back to the raw class behavior (Object).
+ inputType = null;
+ }
+ if (inputType == null) {
+ inputType = (TypeDescriptor<InputT>) TypeDescriptor.of(Object.class);
+ }
+
+ TypeDescriptor<OutputT> outputType;
+ try {
+ outputType = fn.getOutputTypeDescriptor();
+ } catch (Exception e) {
+ // Same as above: fall back to Object if type info is unavailable.
+ outputType = null;
+ }
+ if (outputType == null) {
+ outputType = (TypeDescriptor<OutputT>) TypeDescriptor.of(Object.class);
+ }
+
try {
@SuppressWarnings("unchecked")
DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
(DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
- getByteBuddyInvokerConstructor(signature).newInstance(fn);
+ getByteBuddyInvokerConstructor(signature, inputType,
outputType).newInstance(fn);
if (signature.onTimerMethods() != null) {
for (OnTimerMethod onTimerMethod :
signature.onTimerMethods().values()) {
@@ -297,19 +378,24 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
}
/**
- * Returns a generated constructor for a {@link DoFnInvoker} for the given
{@link DoFn} class.
+ * Returns a generated constructor for a {@link DoFnInvoker} for the given
{@link DoFnSignature}
+ * and specific generic types.
*
* <p>These are cached such that at most one {@link DoFnInvoker} class
exists for a given {@link
- * DoFn} class.
+ * DoFn} class with specific generic type parameters. Different generic
instantiations of the same
+ * DoFn class will have separate cached invoker classes.
*/
- private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature
signature) {
+ private Constructor<?> getByteBuddyInvokerConstructor(
+ DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?>
outputType) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+ InvokerCacheKey cacheKey = new InvokerCacheKey(fnClass, inputType,
outputType);
return byteBuddyInvokerConstructorCache.computeIfAbsent(
- fnClass,
- clazz -> {
- Class<? extends DoFnInvoker<?, ?>> invokerClass =
generateInvokerClass(signature);
+ cacheKey,
+ key -> {
+ Class<? extends DoFnInvoker<?, ?>> invokerClass =
+ generateInvokerClass(signature, inputType, outputType);
try {
- return invokerClass.getConstructor(clazz);
+ return invokerClass.getConstructor(fnClass);
} catch (IllegalArgumentException | NoSuchMethodException |
SecurityException e) {
throw new RuntimeException(e);
}
@@ -456,19 +542,42 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
}
}
+ /**
+ * Generates a type suffix string for use in invoker class names.
+ *
+ * <p>This creates a unique suffix based on the input and output type
descriptors to avoid class
+ * name collisions when the same DoFn class is used with different generic
types.
+ *
+ * <p>The format is: {@code DoFnInvoker$<8-digit hex hash>}
+ *
+ * @param inputType the input type descriptor
+ * @param outputType the output type descriptor
+ * @return a string suffix for the invoker class name
+ */
+ public static String generateTypeSuffix(
+ TypeDescriptor<?> inputType, TypeDescriptor<?> outputType) {
+ return String.format(
+ "%s$%08x",
+ DoFnInvoker.class.getSimpleName(),
+ (inputType.toString() + "|" + outputType.toString()).hashCode());
+ }
+
/** Generates a {@link DoFnInvoker} class for the given {@link
DoFnSignature}. */
- private static Class<? extends DoFnInvoker<?, ?>>
generateInvokerClass(DoFnSignature signature) {
+ private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(
+ DoFnSignature signature, TypeDescriptor<?> inputType, TypeDescriptor<?>
outputType) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
+ // Create a unique suffix based on the type descriptors to avoid class
name collisions
+ // when the same DoFn class is used with different generic types.
+ String typeSuffix = generateTypeSuffix(inputType, outputType);
+
final TypeDescription clazzDescription = new
TypeDescription.ForLoadedType(fnClass);
DynamicType.Builder<?> builder =
new ByteBuddy()
// Create subclasses inside the target class, to have access to
// private and package-private bits
- .with(
- StableInvokerNamingStrategy.forDoFnClass(fnClass)
- .withSuffix(DoFnInvoker.class.getSimpleName()))
+
.with(StableInvokerNamingStrategy.forDoFnClass(fnClass).withSuffix(typeSuffix))
// class <invoker class> extends DoFnInvokerBase {
.subclass(DoFnInvokerBase.class,
ConstructorStrategy.Default.NO_CONSTRUCTORS)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 299c5d5c590..186d58e3318 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
@@ -77,6 +78,8 @@ import
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.OutputBuilder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowedValues;
import org.joda.time.Instant;
import org.junit.Before;
@@ -1382,11 +1385,14 @@ public class DoFnInvokersTest {
@Test
public void testStableName() {
DoFnInvoker<Void, Void> invoker = DoFnInvokers.invokerFor(new
StableNameTestDoFn());
+ // The invoker class name includes a hash of the type descriptors to
support
+ // different generic instantiations of the same DoFn class.
+ // Format: <DoFn class name>$<DoFnInvoker>$<type hash>
+ TypeDescriptor<Void> voidType = new
StableNameTestDoFn().getInputTypeDescriptor();
+ String expectedTypeSuffix =
ByteBuddyDoFnInvokerFactory.generateTypeSuffix(voidType, voidType);
assertThat(
invoker.getClass().getName(),
- equalTo(
- String.format(
- "%s$%s", StableNameTestDoFn.class.getName(),
DoFnInvoker.class.getSimpleName())));
+ equalTo(String.format("%s$%s", StableNameTestDoFn.class.getName(),
expectedTypeSuffix)));
}
@Test
@@ -1406,4 +1412,45 @@ public class DoFnInvokersTest {
verify(mockBundleFinalizer).afterBundleCommit(eq(Instant.ofEpochSecond(42L)),
eq(null));
}
+
+ @Test
+ public void testCacheKeyCollisionProof() throws Exception {
+ class DynamicTypeDoFn<T> extends DoFn<T, T> {
+ private final TypeDescriptor<T> typeDescriptor;
+
+ DynamicTypeDoFn(TypeDescriptor<T> typeDescriptor) {
+ this.typeDescriptor = typeDescriptor;
+ }
+
+ @ProcessElement
+ public void processElement(@Element T element, OutputReceiver<T> out) {
+ out.output(element);
+ }
+
+ // Key point: force returning our specified type instead of relying on
class signature
+ @Override
+ public TypeDescriptor<T> getInputTypeDescriptor() {
+ return typeDescriptor;
+ }
+
+ @Override
+ public TypeDescriptor<T> getOutputTypeDescriptor() {
+ return typeDescriptor;
+ }
+ }
+
+ DoFn<String, String> stringFn = new
DynamicTypeDoFn<>(TypeDescriptors.strings());
+ DoFn<Integer, Integer> intFn = new
DynamicTypeDoFn<>(TypeDescriptors.integers());
+
+ DoFnInvoker<String, String> stringInvoker =
DoFnInvokers.invokerFor(stringFn);
+ DoFnInvoker<Integer, Integer> intInvoker = DoFnInvokers.invokerFor(intFn);
+
+ System.out.println("String Invoker: " +
stringInvoker.getClass().getName());
+ System.out.println("Integer Invoker: " + intInvoker.getClass().getName());
+
+ assertNotSame(
+ "Critical bug: Beam returned the same cached class for different
generic types.",
+ stringInvoker.getClass(),
+ intInvoker.getClass());
+ }
}