This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0c513139ebbf9c41f885090754cb29f749c2504d
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Nov 27 04:08:41 2025 +0200

    [fix][client] Fix thread-safety of AutoProduceBytesSchema (#25014)
    
    (cherry picked from commit d3d5963bdcfaa7b03bef4d14bebba4924e2b5cf7)
---
 .../client/impl/schema/AutoProduceBytesSchema.java       | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 88809748740..927ae94e027 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -33,11 +33,12 @@ import org.apache.pulsar.common.schema.SchemaType;
 public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
     @Setter
-    private boolean requireSchemaValidation = true;
-    private Schema<T> schema;
-    private boolean userProvidedSchema;
+    private volatile boolean requireSchemaValidation = true;
+    private volatile Schema<T> schema;
+    private final boolean userProvidedSchema;
 
     public AutoProduceBytesSchema() {
+        this.userProvidedSchema = false;
     }
 
     public AutoProduceBytesSchema(Schema<T> schema) {
@@ -87,11 +88,12 @@ public class AutoProduceBytesSchema<T> implements 
Schema<byte[]> {
 
         if (requireSchemaValidation) {
             // verify if the message can be decoded by the underlying schema
-            if (schema instanceof KeyValueSchema
-                    && ((KeyValueSchema) 
schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
-                ((KeyValueSchema) schema).getValueSchema().validate(message);
+            Schema<T> localSchema = schema;
+            if (localSchema instanceof KeyValueSchema && ((KeyValueSchema) 
localSchema).getKeyValueEncodingType()
+                    .equals(KeyValueEncodingType.SEPARATED)) {
+                ((KeyValueSchema) 
localSchema).getValueSchema().validate(message);
             } else {
-                schema.validate(message);
+                localSchema.validate(message);
             }
         }
 

Reply via email to