This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd3893669c [fix][broker] Use compatible Avro name validator to allow 
'$' in schema record names (#25193)
3cd3893669c is described below

commit 3cd3893669cee4e41b11f9bbe2ca51813467e657
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Feb 25 00:35:50 2026 +0800

    [fix][broker] Use compatible Avro name validator to allow '$' in schema 
record names (#25193)
---
 .../schema/AvroSchemaBasedCompatibilityCheck.java  |   6 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |   6 +-
 .../validator/StructSchemaDataValidator.java       |  32 +++++-
 .../schema/validator/SchemaDataValidatorTest.java  | 120 +++++++++++++++++++++
 pulsar-broker/src/test/proto/DataRecord.proto      |  35 ++++++
 5 files changed, 193 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index e5fc7800c51..56012a3ddd7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -29,6 +29,7 @@ import org.apache.avro.SchemaValidationException;
 import org.apache.avro.SchemaValidator;
 import org.apache.avro.SchemaValidatorBuilder;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 
@@ -51,11 +52,12 @@ abstract class AvroSchemaBasedCompatibilityCheck implements 
SchemaCompatibilityC
         checkArgument(from != null, "check compatibility list is null");
         try {
             for (SchemaData schemaData : from) {
-                Schema.Parser parser = new Schema.Parser();
+                Schema.Parser parser =
+                        new 
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
                 parser.setValidateDefaults(false);
                 fromList.addFirst(parser.parse(new 
String(schemaData.getData(), UTF_8)));
             }
-            Schema.Parser parser = new Schema.Parser();
+            Schema.Parser parser = new 
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
             parser.setValidateDefaults(false);
             Schema toSchema = parser.parse(new String(to.getData(), UTF_8));
             SchemaValidator schemaValidator = createSchemaValidator(strategy);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index dcf7edee1a2..5c5ed992d24 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
 import 
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import 
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaHash;
@@ -414,12 +415,13 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         final CompletableFuture<SchemaVersion> completableFuture = new 
CompletableFuture<>();
         SchemaVersion schemaVersion;
         if (isUsingAvroSchemaParser(schemaData.getType())) {
-            Schema.Parser parser = new Schema.Parser();
+            Schema.Parser parser = new 
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
             Schema newSchema = parser.parse(new String(schemaData.getData(), 
UTF_8));
 
             for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
                 if 
(isUsingAvroSchemaParser(schemaAndMetadata.schema.getType())) {
-                    Schema.Parser existParser = new Schema.Parser();
+                    Schema.Parser existParser =
+                            new 
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
                     Schema existSchema = existParser.parse(new 
String(schemaAndMetadata.schema.getData(), UTF_8));
                     if (newSchema.equals(existSchema) && 
schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
index 7f3c4e5e46b..8202c5720c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import java.io.IOException;
+import org.apache.avro.NameValidator;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
@@ -32,13 +33,14 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 /**
  * Validate if the struct schema is in expected form.
  */
-class StructSchemaDataValidator implements SchemaDataValidator {
+public class StructSchemaDataValidator implements SchemaDataValidator {
 
     public static StructSchemaDataValidator of() {
         return INSTANCE;
     }
 
     private static final StructSchemaDataValidator INSTANCE = new 
StructSchemaDataValidator();
+    public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new 
CompatibleNameValidator();
 
     private StructSchemaDataValidator() {}
 
@@ -49,7 +51,7 @@ class StructSchemaDataValidator implements 
SchemaDataValidator {
         byte[] data = schemaData.getData();
 
         try {
-            Schema.Parser avroSchemaParser = new Schema.Parser();
+            Schema.Parser avroSchemaParser = new 
Schema.Parser(COMPATIBLE_NAME_VALIDATOR);
             avroSchemaParser.setValidateDefaults(false);
             Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
             if (SchemaType.AVRO.equals(schemaData.getType())) {
@@ -97,4 +99,30 @@ class StructSchemaDataValidator implements 
SchemaDataValidator {
         throw new InvalidSchemaDataException("Invalid schema definition data 
for "
             + schemaData.getType() + " schema", cause);
     }
+
+    static class CompatibleNameValidator implements NameValidator {
+
+        @Override
+        public Result validate(String name) {
+            if (name == null) {
+                return new Result("Null name");
+            }
+            final int length = name.length();
+            if (length == 0) {
+                return new Result("Empty name");
+            }
+            final char first = name.charAt(0);
+            if (!(Character.isLetter(first) || first == '_' || first == '$')) {
+                return new Result("Illegal initial character: " + name);
+            }
+            for (int i = 1; i < length; i++) {
+                final char c = name.charAt(i);
+                // we need to allow $ for the special case
+                if (!(Character.isLetterOrDigit(c) || c == '_' || c == '$')) {
+                    return new Result("Illegal character in: " + name);
+                }
+            }
+            return OK;
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
index 302e5879d28..a69bb649e7c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -23,11 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import org.apache.avro.NameValidator;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.broker.service.schema.proto.DataRecordOuterClass;
+import 
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator.CompatibleNameValidator;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -148,4 +153,119 @@ public class SchemaDataValidatorTest {
         }
     }
 
+    @Test
+    public void testCompatibleNameValidatorValidNames() {
+        CompatibleNameValidator validator = new CompatibleNameValidator();
+
+        String[] validNames = {
+            "validName",
+            "ValidName",
+            "valid_name",
+            "valid$name",
+            "_validName",
+            "$validName",
+            "name123",
+            "Name_123$",
+            "a",
+            "A",
+            "_",
+            "$",
+            "validNameWithMultiple$ymbols_and_numbers123"
+        };
+
+        for (String name : validNames) {
+            NameValidator.Result result = validator.validate(name);
+            Assert.assertTrue(result.isOK(),
+                "Expected validation to pass for name: '" + name + "', but got 
error: " + result.getErrors());
+        }
+    }
+
+    @Test
+    public void testCompatibleNameValidatorInvalidNames() {
+        CompatibleNameValidator validator = new CompatibleNameValidator();
+
+        String[] invalidNames = {
+            null,
+            "",
+            "123name",
+            "1name",
+            "name-with-dash",
+            "name with space",
+            "name.with.dot",
+            "name@symbol",
+            "name#hash",
+            "name%percent",
+            "name&ampersand",
+            "name*asterisk",
+            "name(parentheses)",
+            "name+plus",
+            "name=equals",
+            "name[brackets]",
+            "name{braces}",
+            "name|pipe",
+            "name\\backslash",
+            "name:colon",
+            "name;semicolon",
+            "name\"quote",
+            "name'apostrophe",
+            "name<greater>",
+            "name,comma",
+            "name?question",
+            "name!exclamation",
+            "name`backtick",
+            "name~tilde",
+            "name^caret"
+        };
+
+        for (String name : invalidNames) {
+            NameValidator.Result result = validator.validate(name);
+            Assert.assertFalse(result.isOK(), "Expected validation to fail for 
name: '" + name + "'");
+        }
+    }
+
+    @Test
+    public void testCompatibleNameValidatorSpecificErrorMessages() throws 
Exception {
+        CompatibleNameValidator validator = new CompatibleNameValidator();
+
+        NameValidator.Result nullResult = validator.validate(null);
+        Assert.assertFalse(nullResult.isOK());
+        Assert.assertEquals(nullResult.getErrors(), "Null name");
+
+        NameValidator.Result emptyResult = validator.validate("");
+        Assert.assertFalse(emptyResult.isOK());
+        Assert.assertEquals(emptyResult.getErrors(), "Empty name");
+
+        NameValidator.Result invalidFirstCharResult = 
validator.validate("123name");
+        Assert.assertFalse(invalidFirstCharResult.isOK());
+        Assert.assertTrue(invalidFirstCharResult.getErrors().contains("Illegal 
initial character"));
+
+        NameValidator.Result invalidCharResult = 
validator.validate("name-with-dash");
+        Assert.assertFalse(invalidCharResult.isOK());
+        Assert.assertTrue(invalidCharResult.getErrors().contains("Illegal 
character in"));
+    }
+
+    @Test
+    public void testCompatibleNameValidatorEdgeCases() throws Exception {
+        CompatibleNameValidator validator = new CompatibleNameValidator();
+
+        Assert.assertTrue(validator.validate("a").isOK());
+        Assert.assertTrue(validator.validate("A").isOK());
+        Assert.assertTrue(validator.validate("_").isOK());
+        Assert.assertTrue(validator.validate("$").isOK());
+
+        NameValidator.Result longNameResult = 
validator.validate("a".repeat(1000));
+        Assert.assertTrue(longNameResult.isOK());
+
+        NameValidator.Result nameWithOnlyDigits = validator.validate("123");
+        Assert.assertFalse(nameWithOnlyDigits.isOK());
+        Assert.assertTrue(nameWithOnlyDigits.getErrors().contains("Illegal 
initial character"));
+    }
+
+    @Test
+    public void testAvroCompatible() throws InvalidSchemaDataException {
+        final ProtobufSchema<DataRecordOuterClass.DataRecord> protobufSchema =
+                ProtobufSchema.of(DataRecordOuterClass.DataRecord.class);
+        
StructSchemaDataValidator.of().validate(SchemaData.fromSchemaInfo(protobufSchema.getSchemaInfo()));
+    }
+
 }
diff --git a/pulsar-broker/src/test/proto/DataRecord.proto 
b/pulsar-broker/src/test/proto/DataRecord.proto
new file mode 100644
index 00000000000..18d0ad4d70f
--- /dev/null
+++ b/pulsar-broker/src/test/proto/DataRecord.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema.proto";
+
+
+message DataRecord {
+  string field1 = 1;
+  int64 field2 = 2;
+  NestedDataRecord field3 = 3;
+  repeated NestedDataRecord fields4 = 4;
+
+  message NestedDataRecord {
+    string field1 = 1;
+    int64 field2 = 2;
+  }
+}
\ No newline at end of file

Reply via email to