This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0c513139ebbf9c41f885090754cb29f749c2504d Author: Lari Hotari <[email protected]> AuthorDate: Thu Nov 27 04:08:41 2025 +0200 [fix][client] Fix thread-safety of AutoProduceBytesSchema (#25014) (cherry picked from commit d3d5963bdcfaa7b03bef4d14bebba4924e2b5cf7) --- .../client/impl/schema/AutoProduceBytesSchema.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index 88809748740..927ae94e027 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -33,11 +33,12 @@ import org.apache.pulsar.common.schema.SchemaType; public class AutoProduceBytesSchema<T> implements Schema<byte[]> { @Setter - private boolean requireSchemaValidation = true; - private Schema<T> schema; - private boolean userProvidedSchema; + private volatile boolean requireSchemaValidation = true; + private volatile Schema<T> schema; + private final boolean userProvidedSchema; public AutoProduceBytesSchema() { + this.userProvidedSchema = false; } public AutoProduceBytesSchema(Schema<T> schema) { @@ -87,11 +88,12 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> { if (requireSchemaValidation) { // verify if the message can be decoded by the underlying schema - if (schema instanceof KeyValueSchema - && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) { - ((KeyValueSchema) schema).getValueSchema().validate(message); + Schema<T> localSchema = schema; + if (localSchema instanceof KeyValueSchema && ((KeyValueSchema) localSchema).getKeyValueEncodingType() + .equals(KeyValueEncodingType.SEPARATED)) { + ((KeyValueSchema) localSchema).getValueSchema().validate(message); } else { - schema.validate(message); + localSchema.validate(message); } }
