This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e5e72fde9e9 [fix][broker] Use compatible Avro name validator to allow
'$' in schema record names (#25193)
e5e72fde9e9 is described below
commit e5e72fde9e939e53fb2fbed2f944705c483cdf97
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Feb 25 00:35:50 2026 +0800
[fix][broker] Use compatible Avro name validator to allow '$' in schema
record names (#25193)
(cherry picked from commit 3cd3893669cee4e41b11f9bbe2ca51813467e657)
---
.../schema/AvroSchemaBasedCompatibilityCheck.java | 6 +-
.../service/schema/SchemaRegistryServiceImpl.java | 6 +-
.../validator/StructSchemaDataValidator.java | 32 +++++-
.../schema/validator/SchemaDataValidatorTest.java | 120 +++++++++++++++++++++
pulsar-broker/src/test/proto/DataRecord.proto | 35 ++++++
5 files changed, 193 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index e5fc7800c51..56012a3ddd7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -29,6 +29,7 @@ import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -51,11 +52,12 @@ abstract class AvroSchemaBasedCompatibilityCheck implements
SchemaCompatibilityC
checkArgument(from != null, "check compatibility list is null");
try {
for (SchemaData schemaData : from) {
- Schema.Parser parser = new Schema.Parser();
+ Schema.Parser parser =
+ new
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
parser.setValidateDefaults(false);
fromList.addFirst(parser.parse(new
String(schemaData.getData(), UTF_8)));
}
- Schema.Parser parser = new Schema.Parser();
+ Schema.Parser parser = new
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
parser.setValidateDefaults(false);
Schema toSchema = parser.parse(new String(to.getData(), UTF_8));
SchemaValidator schemaValidator = createSchemaValidator(strategy);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index dcf7edee1a2..5c5ed992d24 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -50,6 +50,7 @@ import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
@@ -414,12 +415,13 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
final CompletableFuture<SchemaVersion> completableFuture = new
CompletableFuture<>();
SchemaVersion schemaVersion;
if (isUsingAvroSchemaParser(schemaData.getType())) {
- Schema.Parser parser = new Schema.Parser();
+ Schema.Parser parser = new
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
Schema newSchema = parser.parse(new String(schemaData.getData(),
UTF_8));
for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
if
(isUsingAvroSchemaParser(schemaAndMetadata.schema.getType())) {
- Schema.Parser existParser = new Schema.Parser();
+ Schema.Parser existParser =
+ new
Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
Schema existSchema = existParser.parse(new
String(schemaAndMetadata.schema.getData(), UTF_8));
if (newSchema.equals(existSchema) &&
schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
index 7f3c4e5e46b..8202c5720c2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import java.io.IOException;
+import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
@@ -32,13 +33,14 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
* Validate if the struct schema is in expected form.
*/
-class StructSchemaDataValidator implements SchemaDataValidator {
+public class StructSchemaDataValidator implements SchemaDataValidator {
public static StructSchemaDataValidator of() {
return INSTANCE;
}
private static final StructSchemaDataValidator INSTANCE = new
StructSchemaDataValidator();
+ public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new
CompatibleNameValidator();
private StructSchemaDataValidator() {}
@@ -49,7 +51,7 @@ class StructSchemaDataValidator implements
SchemaDataValidator {
byte[] data = schemaData.getData();
try {
- Schema.Parser avroSchemaParser = new Schema.Parser();
+ Schema.Parser avroSchemaParser = new
Schema.Parser(COMPATIBLE_NAME_VALIDATOR);
avroSchemaParser.setValidateDefaults(false);
Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
if (SchemaType.AVRO.equals(schemaData.getType())) {
@@ -97,4 +99,30 @@ class StructSchemaDataValidator implements
SchemaDataValidator {
throw new InvalidSchemaDataException("Invalid schema definition data
for "
+ schemaData.getType() + " schema", cause);
}
+
+ static class CompatibleNameValidator implements NameValidator {
+
+ @Override
+ public Result validate(String name) {
+ if (name == null) {
+ return new Result("Null name");
+ }
+ final int length = name.length();
+ if (length == 0) {
+ return new Result("Empty name");
+ }
+ final char first = name.charAt(0);
+ if (!(Character.isLetter(first) || first == '_' || first == '$')) {
+ return new Result("Illegal initial character: " + name);
+ }
+ for (int i = 1; i < length; i++) {
+ final char c = name.charAt(i);
+ // we need to allow $ for the special case
+ if (!(Character.isLetterOrDigit(c) || c == '_' || c == '$')) {
+ return new Result("Illegal character in: " + name);
+ }
+ }
+ return OK;
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
index 302e5879d28..a69bb649e7c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -23,11 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import org.apache.avro.NameValidator;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.broker.service.schema.proto.DataRecordOuterClass;
+import
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator.CompatibleNameValidator;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -148,4 +153,119 @@ public class SchemaDataValidatorTest {
}
}
+ @Test
+ public void testCompatibleNameValidatorValidNames() {
+ CompatibleNameValidator validator = new CompatibleNameValidator();
+
+ String[] validNames = {
+ "validName",
+ "ValidName",
+ "valid_name",
+ "valid$name",
+ "_validName",
+ "$validName",
+ "name123",
+ "Name_123$",
+ "a",
+ "A",
+ "_",
+ "$",
+ "validNameWithMultiple$ymbols_and_numbers123"
+ };
+
+ for (String name : validNames) {
+ NameValidator.Result result = validator.validate(name);
+ Assert.assertTrue(result.isOK(),
+ "Expected validation to pass for name: '" + name + "', but got
error: " + result.getErrors());
+ }
+ }
+
+ @Test
+ public void testCompatibleNameValidatorInvalidNames() {
+ CompatibleNameValidator validator = new CompatibleNameValidator();
+
+ String[] invalidNames = {
+ null,
+ "",
+ "123name",
+ "1name",
+ "name-with-dash",
+ "name with space",
+ "name.with.dot",
+ "name@symbol",
+ "name#hash",
+ "name%percent",
+ "name&ersand",
+ "name*asterisk",
+ "name(parentheses)",
+ "name+plus",
+ "name=equals",
+ "name[brackets]",
+ "name{braces}",
+ "name|pipe",
+ "name\\backslash",
+ "name:colon",
+ "name;semicolon",
+ "name\"quote",
+ "name'apostrophe",
+ "name<greater>",
+ "name,comma",
+ "name?question",
+ "name!exclamation",
+ "name`backtick",
+ "name~tilde",
+ "name^caret"
+ };
+
+ for (String name : invalidNames) {
+ NameValidator.Result result = validator.validate(name);
+ Assert.assertFalse(result.isOK(), "Expected validation to fail for
name: '" + name + "'");
+ }
+ }
+
+ @Test
+ public void testCompatibleNameValidatorSpecificErrorMessages() throws
Exception {
+ CompatibleNameValidator validator = new CompatibleNameValidator();
+
+ NameValidator.Result nullResult = validator.validate(null);
+ Assert.assertFalse(nullResult.isOK());
+ Assert.assertEquals(nullResult.getErrors(), "Null name");
+
+ NameValidator.Result emptyResult = validator.validate("");
+ Assert.assertFalse(emptyResult.isOK());
+ Assert.assertEquals(emptyResult.getErrors(), "Empty name");
+
+ NameValidator.Result invalidFirstCharResult =
validator.validate("123name");
+ Assert.assertFalse(invalidFirstCharResult.isOK());
+ Assert.assertTrue(invalidFirstCharResult.getErrors().contains("Illegal
initial character"));
+
+ NameValidator.Result invalidCharResult =
validator.validate("name-with-dash");
+ Assert.assertFalse(invalidCharResult.isOK());
+ Assert.assertTrue(invalidCharResult.getErrors().contains("Illegal
character in"));
+ }
+
+ @Test
+ public void testCompatibleNameValidatorEdgeCases() throws Exception {
+ CompatibleNameValidator validator = new CompatibleNameValidator();
+
+ Assert.assertTrue(validator.validate("a").isOK());
+ Assert.assertTrue(validator.validate("A").isOK());
+ Assert.assertTrue(validator.validate("_").isOK());
+ Assert.assertTrue(validator.validate("$").isOK());
+
+ NameValidator.Result longNameResult =
validator.validate("a".repeat(1000));
+ Assert.assertTrue(longNameResult.isOK());
+
+ NameValidator.Result nameWithOnlyDigits = validator.validate("123");
+ Assert.assertFalse(nameWithOnlyDigits.isOK());
+ Assert.assertTrue(nameWithOnlyDigits.getErrors().contains("Illegal
initial character"));
+ }
+
+ @Test
+ public void testAvroCompatible() throws InvalidSchemaDataException {
+ final ProtobufSchema<DataRecordOuterClass.DataRecord> protobufSchema =
+ ProtobufSchema.of(DataRecordOuterClass.DataRecord.class);
+
StructSchemaDataValidator.of().validate(SchemaData.fromSchemaInfo(protobufSchema.getSchemaInfo()));
+ }
+
}
diff --git a/pulsar-broker/src/test/proto/DataRecord.proto
b/pulsar-broker/src/test/proto/DataRecord.proto
new file mode 100644
index 00000000000..18d0ad4d70f
--- /dev/null
+++ b/pulsar-broker/src/test/proto/DataRecord.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema.proto";
+
+
+message DataRecord {
+ string field1 = 1;
+ int64 field2 = 2;
+ NestedDataRecord field3 = 3;
+ repeated NestedDataRecord fields4 = 4;
+
+ message NestedDataRecord {
+ string field1 = 1;
+ int64 field2 = 2;
+ }
+}
\ No newline at end of file