This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5593f405cbe Fix race conditions in apache.beam.sdks.schemas (#35752)
5593f405cbe is described below
commit 5593f405cbef0fbad506c2a8726548628d53e120
Author: Jack McCluskey <[email protected]>
AuthorDate: Tue Aug 5 09:55:38 2025 -0400
Fix race conditions in apache.beam.sdks.schemas (#35752)
* Fix race conditions in apache.beam.sdks.schemas
* fix mismatched null checks
* spotless checks
* restore old initFieldConverters def
* swap to MonotonicNonNull
* spotless
---
.../java/org/apache/beam/sdk/schemas/CachingFactory.java | 16 ++++++++++++----
.../org/apache/beam/sdk/schemas/FromRowUsingCreator.java | 2 +-
2 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
index 6e244fefb26..d2d7a1c78d2 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.checkerframework.checker.initialization.qual.NotOnlyInitialized;
import org.checkerframework.checker.initialization.qual.UnknownInitialization;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -36,7 +37,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* inner factory, so the schema comparison only need happen on the first
lookup.
*/
public class CachingFactory<CreatedT extends @NonNull Object> implements
Factory<CreatedT> {
- private transient @Nullable ConcurrentHashMap<TypeDescriptor<?>, CreatedT>
cache = null;
+ private transient volatile @MonotonicNonNull
ConcurrentHashMap<TypeDescriptor<?>, CreatedT>
+ cache = null;
private final @NotOnlyInitialized Factory<CreatedT> innerFactory;
@@ -45,10 +47,16 @@ public class CachingFactory<CreatedT extends @NonNull
Object> implements Factory
}
private ConcurrentHashMap<TypeDescriptor<?>, CreatedT> getCache() {
- if (cache == null) {
- cache = new ConcurrentHashMap<>();
+ ConcurrentHashMap<TypeDescriptor<?>, CreatedT> value = cache;
+ if (value == null) {
+ synchronized (this) {
+ value = cache;
+ if (value == null) {
+ cache = value = new ConcurrentHashMap<>();
+ }
+ }
}
- return cache;
+ return value;
}
@Override
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index b839a19a817..69ae81bcd07 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -53,7 +53,7 @@ class FromRowUsingCreator<T> implements
SerializableFunction<Row, T>, Function<R
private final Factory<SchemaUserTypeCreator> schemaTypeCreatorFactory;
@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
- private transient @MonotonicNonNull Function[] fieldConverters;
+ private transient volatile @MonotonicNonNull Function[] fieldConverters;
public FromRowUsingCreator(
TypeDescriptor<T> typeDescriptor, GetterBasedSchemaProvider
schemaProvider) {