This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0e4ca8085924f757717ccae441cfd650f9d5f917 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Fri Apr 18 02:48:08 2025 +0800 [fix][schema] Reject unsupported Avro schema types during schema registration (#24103) (cherry picked from commit 3bdc6617f79d9217c9d503f45aaaf274deaec040) --- .../schema/validator/SchemaDataValidator.java | 4 ++ .../validator/StructSchemaDataValidator.java | 24 +++++++++- .../java/org/apache/pulsar/schema/SchemaTest.java | 31 ++++++++++++- .../SchemaCompatibilityCheckTest.java | 52 ++++++++++++++++++++-- .../impl/schema/generic/GenericSchemaImpl.java | 15 +++++-- .../impl/schema/generic/GenericSchemaTest.java | 27 +++++++++++ 6 files changed, 144 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java index f402a45bc23..85b73f53f81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java @@ -22,12 +22,16 @@ import org.apache.pulsar.broker.service.schema.KeyValueSchemaCompatibilityCheck; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.KeyValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A validator to validate the schema data is well formed. */ public interface SchemaDataValidator { + Logger LOGGER = LoggerFactory.getLogger(SchemaDataValidator.class); + /** * Validate if the schema data is well formed. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java index 4f52d2cf69c..7f3c4e5e46b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java @@ -51,7 +51,10 @@ class StructSchemaDataValidator implements SchemaDataValidator { try { Schema.Parser avroSchemaParser = new Schema.Parser(); avroSchemaParser.setValidateDefaults(false); - avroSchemaParser.parse(new String(data, UTF_8)); + Schema schema = avroSchemaParser.parse(new String(data, UTF_8)); + if (SchemaType.AVRO.equals(schemaData.getType())) { + checkAvroSchemaTypeSupported(schema); + } } catch (SchemaParseException e) { if (schemaData.getType() == SchemaType.JSON) { // we used JsonSchema for storing the definition of a JSON schema @@ -65,11 +68,30 @@ class StructSchemaDataValidator implements SchemaDataValidator { } else { throwInvalidSchemaDataException(schemaData, e); } + } catch (InvalidSchemaDataException invalidSchemaDataException) { + throw invalidSchemaDataException; } catch (Exception e) { throwInvalidSchemaDataException(schemaData, e); } } + static void checkAvroSchemaTypeSupported(Schema schema) throws InvalidSchemaDataException { + switch (schema.getType()) { + case RECORD: { + break; + } + case UNION: { + throw new InvalidSchemaDataException( + "Avro schema typed [UNION] is not supported"); + } + default: { + // INT, LONG, FLOAT, DOUBLE, BOOLEAN, STRING, BYTES. + // ARRAY, MAP, FIXED, NULL. + LOGGER.info("Registering a special avro schema typed [{}]", schema.getType()); + } + } + } + private static void throwInvalidSchemaDataException(SchemaData schemaData, Throwable cause) throws InvalidSchemaDataException { throw new InvalidSchemaDataException("Invalid schema definition data for " diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index fb70e8f4ff5..21999989603 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -54,6 +54,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistry; @@ -412,7 +413,35 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { // JSON schema with primitive class can consume assertEquals(consumer.receive().getValue().getNativeObject(), producerJsonIntegerValue); assertArrayEquals((byte[]) consumer.receive().getValue().getNativeObject(), producerJsonBytesValue); -} + } + + @Test + public void testAvroIntSchema() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + PUBLIC_TENANT + "/my-ns/tp"); + + Producer<Integer> producer = pulsarClient.newProducer(Schema.AVRO(Integer.class)).topic(topicName).create(); + Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.AVRO(Integer.class)).topic(topicName) + .subscriptionName("sub").subscribe(); + + producer.send(1); + producer.send(2); + producer.send(3); + + Message<Integer> msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + assertEquals(msg1.getValue(), 1); + Message<Integer> msg2 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg2); + assertEquals(msg2.getValue(), 2); + Message<Integer> msg3 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg3); + assertEquals(msg3.getValue(), 3); + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } @Test public void testJSONSchemaDeserialize() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 49517a424b9..a94fe7786c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -19,15 +19,19 @@ package org.apache.pulsar.schema.compatibility; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; @@ -36,6 +40,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -68,6 +73,8 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); + String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE; + admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME)); } @AfterMethod(alwaysRun = true) @@ -483,9 +490,8 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { @Test public void testSchemaLedgerAutoRelease() throws Exception { - String namespaceName = PUBLIC_TENANT + "/default"; - String topicName = "persistent://" + namespaceName + "/tp"; - admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME)); + String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE; + String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp"); admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); // Update schema 100 times. for (int i = 0; i < 100; i++){ @@ -516,6 +522,46 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { admin.topics().delete(topicName, true); } + @Test + public void testAddUnionAvroSchema() throws Exception { + String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE; + String topicName = BrokerTestUtil.newUniqueName(namespaceName + "/tp"); + admin.topics().createNonPartitionedTopic(topicName); + + // Create a union type schema. + SchemaInfoImpl schemaInfo = new SchemaInfoImpl(); + schemaInfo.setType(SchemaType.AVRO); + schemaInfo.setSchema( + """ + [{ + "namespace": "org.apache.pulsar.schema.compatibility.TestA", + "type": "enum", + "name": "EventSource", + "symbols": ["AUTO_EVENTING", "HOODLUM", "OPTA", "ISD", "LIVE_STATS", "NGSS", "UNIFIED"] + }, { + "namespace": "org.apache.pulsar.schema.compatibility.TestB", + "type": "enum", + "name": "PeriodType", + "symbols": ["REGULAR", "EXTRA_TIME"] + }] + """.getBytes(UTF_8)); + schemaInfo.setName(topicName); + schemaInfo.setTimestamp(System.currentTimeMillis()); + try { + admin.schemas().createSchema(topicName, schemaInfo); + fail("avro-union schema is not supported"); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("Avro schema typed [UNION] is not supported")); + } + + // Create a producer with auto_produce schema. + Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + + // Cleanup. + producer.close(); + admin.topics().delete(topicName, false); + } + @Test public void testAutoProduceSchemaAlwaysCompatible() throws Exception { final String tenant = PUBLIC_TENANT; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java index a80675bfcc5..db4ad91f6a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema.generic; import java.util.List; import java.util.stream.Collectors; +import org.apache.avro.AvroRuntimeException; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; @@ -40,10 +41,16 @@ public abstract class GenericSchemaImpl extends AvroBaseStructSchema<GenericReco protected GenericSchemaImpl(SchemaInfo schemaInfo) { super(schemaInfo); - this.fields = schema.getFields() - .stream() - .map(f -> new Field(f.name(), f.pos())) - .collect(Collectors.toList()); + try { + this.fields = schema.getFields() + .stream() + .map(f -> new Field(f.name(), f.pos())) + .collect(Collectors.toList()); + } catch (AvroRuntimeException avroRuntimeException) { + // Rewrite error log. + throw new AvroRuntimeException("Schema typed [" + schema.getClass().getName() + "], simple-type:[" + + schema.getType() + "] is not supported. schema-content: " + schema); + } } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java index 0cd06ee3a47..0d521b99294 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java @@ -20,17 +20,20 @@ package org.apache.pulsar.client.impl.schema.generic; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.avro.AvroRuntimeException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar; import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.LongSchemaVersion; +import org.apache.pulsar.common.schema.SchemaType; import org.testng.annotations.Test; import java.util.List; @@ -42,6 +45,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; /** * Unit testing generic schemas. @@ -64,6 +68,29 @@ public class GenericSchemaTest { testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); } + @Test + public void testUnionSchema() { + SchemaInfoImpl schemaInfo = new SchemaInfoImpl(); + schemaInfo.setType(SchemaType.AVRO); + schemaInfo.setSchema(("[{\n" + + "\"namespace\": \"org.apache.pulsar.schema.compatibility.TestA\",\n" + + "\"type\": \"enum\",\n" + + "\"name\": \"EventSource\",\n" + + "\"symbols\": [\"AUTO_EVENTING\", \"HOODLUM\", \"OPTA\", \"ISD\", \"LIVE_STATS\", \"NGSS\", \"UNIFIED\"]\n" + + "}, {\n" + + "\"namespace\": \"org.apache.pulsar.schema.compatibility.TestB\",\n" + + "\"type\": \"enum\",\n" + + "\"name\": \"PeriodType\",\n" + + "\"symbols\": [\"REGULAR\", \"EXTRA_TIME\"]\n" + + "}]").getBytes(UTF_8)); + try { + GenericJsonSchema.of(schemaInfo); + fail("expected an not-supported exception"); + } catch (AvroRuntimeException e) { + assertTrue(e.getMessage().contains("simple-type:[UNION] is not supported")); + } + } + @Test public void testAutoAvroSchema() { // configure encode schema