This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 469df5c6f08 [fix][client] Fix thread-safety of AutoProduceBytesSchema 
(#25014)
469df5c6f08 is described below

commit 469df5c6f08a56cff5067c5709b842d50bae9c5b
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Nov 27 04:08:41 2025 +0200

    [fix][client] Fix thread-safety of AutoProduceBytesSchema (#25014)
---
 .../client/impl/schema/AutoProduceBytesSchema.java       | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index cc134b57b21..9abbf81576d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -33,11 +33,12 @@ import org.apache.pulsar.common.schema.SchemaType;
 public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
     @Setter
-    private boolean requireSchemaValidation = true;
-    private Schema<T> schema;
-    private boolean userProvidedSchema;
+    private volatile boolean requireSchemaValidation = true;
+    private volatile Schema<T> schema;
+    private final boolean userProvidedSchema;
 
     public AutoProduceBytesSchema() {
+        this.userProvidedSchema = false;
     }
 
     public AutoProduceBytesSchema(Schema<T> schema) {
@@ -81,11 +82,12 @@ public class AutoProduceBytesSchema<T> implements 
Schema<byte[]> {
 
         if (requireSchemaValidation) {
             // verify if the message can be decoded by the underlying schema
-            if (schema instanceof KeyValueSchema
-                    && ((KeyValueSchema) 
schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
-                ((KeyValueSchema) schema).getValueSchema().validate(message);
+            Schema<T> localSchema = schema;
+            if (localSchema instanceof KeyValueSchema && ((KeyValueSchema) 
localSchema).getKeyValueEncodingType()
+                    .equals(KeyValueEncodingType.SEPARATED)) {
+                ((KeyValueSchema) 
localSchema).getValueSchema().validate(message);
             } else {
-                schema.validate(message);
+                localSchema.validate(message);
             }
         }
 

Reply via email to