This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5593f405cbe Fix race conditions in apache.beam.sdks.schemas (#35752)
5593f405cbe is described below

commit 5593f405cbef0fbad506c2a8726548628d53e120
Author: Jack McCluskey <[email protected]>
AuthorDate: Tue Aug 5 09:55:38 2025 -0400

    Fix race conditions in apache.beam.sdks.schemas (#35752)
    
    * Fix race conditions in apache.beam.sdks.schemas
    
    * fix mismatched null checks
    
    * spotless checks
    
    * restore old initFieldConverters def
    
    * swap to MonotonicNonNull
    
    * spotless
---
 .../java/org/apache/beam/sdk/schemas/CachingFactory.java | 16 ++++++++++++----
 .../org/apache/beam/sdk/schemas/FromRowUsingCreator.java |  2 +-
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
index 6e244fefb26..d2d7a1c78d2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.checkerframework.checker.initialization.qual.NotOnlyInitialized;
 import org.checkerframework.checker.initialization.qual.UnknownInitialization;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -36,7 +37,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * inner factory, so the schema comparison only need happen on the first 
lookup.
  */
 public class CachingFactory<CreatedT extends @NonNull Object> implements 
Factory<CreatedT> {
-  private transient @Nullable ConcurrentHashMap<TypeDescriptor<?>, CreatedT> 
cache = null;
+  private transient volatile @MonotonicNonNull 
ConcurrentHashMap<TypeDescriptor<?>, CreatedT>
+      cache = null;
 
   private final @NotOnlyInitialized Factory<CreatedT> innerFactory;
 
@@ -45,10 +47,16 @@ public class CachingFactory<CreatedT extends @NonNull 
Object> implements Factory
   }
 
   private ConcurrentHashMap<TypeDescriptor<?>, CreatedT> getCache() {
-    if (cache == null) {
-      cache = new ConcurrentHashMap<>();
+    ConcurrentHashMap<TypeDescriptor<?>, CreatedT> value = cache;
+    if (value == null) {
+      synchronized (this) {
+        value = cache;
+        if (value == null) {
+          cache = value = new ConcurrentHashMap<>();
+        }
+      }
     }
-    return cache;
+    return value;
   }
 
   @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index b839a19a817..69ae81bcd07 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -53,7 +53,7 @@ class FromRowUsingCreator<T> implements 
SerializableFunction<Row, T>, Function<R
   private final Factory<SchemaUserTypeCreator> schemaTypeCreatorFactory;
 
   @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
-  private transient @MonotonicNonNull Function[] fieldConverters;
+  private transient volatile @MonotonicNonNull Function[] fieldConverters;
 
   public FromRowUsingCreator(
       TypeDescriptor<T> typeDescriptor, GetterBasedSchemaProvider 
schemaProvider) {

Reply via email to