This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0e4ca8085924f757717ccae441cfd650f9d5f917
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Fri Apr 18 02:48:08 2025 +0800

    [fix][schema] Reject unsupported Avro schema types during schema 
registration (#24103)
    
    (cherry picked from commit 3bdc6617f79d9217c9d503f45aaaf274deaec040)
---
 .../schema/validator/SchemaDataValidator.java      |  4 ++
 .../validator/StructSchemaDataValidator.java       | 24 +++++++++-
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 31 ++++++++++++-
 .../SchemaCompatibilityCheckTest.java              | 52 ++++++++++++++++++++--
 .../impl/schema/generic/GenericSchemaImpl.java     | 15 +++++--
 .../impl/schema/generic/GenericSchemaTest.java     | 27 +++++++++++
 6 files changed, 144 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index f402a45bc23..85b73f53f81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -22,12 +22,16 @@ import 
org.apache.pulsar.broker.service.schema.KeyValueSchemaCompatibilityCheck;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A validator to validate the schema data is well formed.
  */
 public interface SchemaDataValidator {
 
+    Logger LOGGER = LoggerFactory.getLogger(SchemaDataValidator.class);
+
     /**
      * Validate if the schema data is well formed.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
index 4f52d2cf69c..7f3c4e5e46b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -51,7 +51,10 @@ class StructSchemaDataValidator implements 
SchemaDataValidator {
         try {
             Schema.Parser avroSchemaParser = new Schema.Parser();
             avroSchemaParser.setValidateDefaults(false);
-            avroSchemaParser.parse(new String(data, UTF_8));
+            Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
+            if (SchemaType.AVRO.equals(schemaData.getType())) {
+                checkAvroSchemaTypeSupported(schema);
+            }
         } catch (SchemaParseException e) {
             if (schemaData.getType() == SchemaType.JSON) {
                 // we used JsonSchema for storing the definition of a JSON 
schema
@@ -65,11 +68,30 @@ class StructSchemaDataValidator implements 
SchemaDataValidator {
             } else {
                 throwInvalidSchemaDataException(schemaData, e);
             }
+        } catch (InvalidSchemaDataException invalidSchemaDataException) {
+            throw invalidSchemaDataException;
         } catch (Exception e) {
             throwInvalidSchemaDataException(schemaData, e);
         }
     }
 
+    static void checkAvroSchemaTypeSupported(Schema schema) throws 
InvalidSchemaDataException {
+            switch (schema.getType()) {
+                case RECORD: {
+                    break;
+                }
+                case UNION: {
+                    throw new InvalidSchemaDataException(
+                            "Avro schema typed [UNION] is not supported");
+                }
+                default: {
+                    // INT, LONG, FLOAT, DOUBLE, BOOLEAN, STRING, BYTES.
+                    // ARRAY, MAP, FIXED, NULL.
+                    LOGGER.info("Registering a special avro schema typed 
[{}]", schema.getType());
+                }
+            }
+    }
+
     private static void throwInvalidSchemaDataException(SchemaData schemaData,
                                                         Throwable cause) 
throws InvalidSchemaDataException {
         throw new InvalidSchemaDataException("Invalid schema definition data 
for "
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index fb70e8f4ff5..21999989603 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -54,6 +54,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
@@ -412,7 +413,35 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         // JSON schema with primitive class can consume
         assertEquals(consumer.receive().getValue().getNativeObject(), 
producerJsonIntegerValue);
         assertArrayEquals((byte[])  
consumer.receive().getValue().getNativeObject(), producerJsonBytesValue);
-}
+    }
+
+    @Test
+    public void testAvroIntSchema() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ PUBLIC_TENANT + "/my-ns/tp");
+
+        Producer<Integer> producer = 
pulsarClient.newProducer(Schema.AVRO(Integer.class)).topic(topicName).create();
+        Consumer<Integer> consumer = 
pulsarClient.newConsumer(Schema.AVRO(Integer.class)).topic(topicName)
+                .subscriptionName("sub").subscribe();
+
+        producer.send(1);
+        producer.send(2);
+        producer.send(3);
+
+        Message<Integer> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg1);
+        assertEquals(msg1.getValue(), 1);
+        Message<Integer> msg2 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg2);
+        assertEquals(msg2.getValue(), 2);
+        Message<Integer> msg3 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg3);
+        assertEquals(msg3.getValue(), 3);
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
 
     @Test
     public void testJSONSchemaDeserialize() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 49517a424b9..a94fe7786c7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -19,15 +19,19 @@
 package org.apache.pulsar.schema.compatibility;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
@@ -36,6 +40,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -68,6 +73,8 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
                 .allowedClusters(Collections.singleton(CLUSTER_NAME))
                 .build();
         admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+        String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
+        admin.namespaces().createNamespace(namespaceName, 
Sets.newHashSet(CLUSTER_NAME));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -483,9 +490,8 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testSchemaLedgerAutoRelease() throws Exception {
-        String namespaceName = PUBLIC_TENANT + "/default";
-        String topicName = "persistent://" + namespaceName + "/tp";
-        admin.namespaces().createNamespace(namespaceName, 
Sets.newHashSet(CLUSTER_NAME));
+        String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
+        String topicName = BrokerTestUtil.newUniqueName("persistent://" + 
namespaceName + "/tp");
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
         // Update schema 100 times.
         for (int i = 0; i < 100; i++){
@@ -516,6 +522,46 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(topicName, true);
     }
 
+    @Test
+    public void testAddUnionAvroSchema() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE;
+        String topicName = BrokerTestUtil.newUniqueName(namespaceName + "/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create a union type schema.
+        SchemaInfoImpl schemaInfo = new SchemaInfoImpl();
+        schemaInfo.setType(SchemaType.AVRO);
+        schemaInfo.setSchema(
+            """
+            [{
+                "namespace": "org.apache.pulsar.schema.compatibility.TestA",
+                "type": "enum",
+                "name": "EventSource",
+                "symbols": ["AUTO_EVENTING", "HOODLUM", "OPTA", "ISD", 
"LIVE_STATS", "NGSS", "UNIFIED"]
+             }, {
+                "namespace": "org.apache.pulsar.schema.compatibility.TestB",
+                "type": "enum",
+                "name": "PeriodType",
+                "symbols": ["REGULAR", "EXTRA_TIME"]
+             }]
+            """.getBytes(UTF_8));
+        schemaInfo.setName(topicName);
+        schemaInfo.setTimestamp(System.currentTimeMillis());
+        try {
+            admin.schemas().createSchema(topicName, schemaInfo);
+            fail("avro-union schema is not supported");
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getMessage().contains("Avro schema typed [UNION] is 
not supported"));
+        }
+
+        // Create a producer with auto_produce schema.
+        Producer producer = 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+
+        // Cleanup.
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
     @Test
     public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
         final String tenant = PUBLIC_TENANT;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index a80675bfcc5..db4ad91f6a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema.generic;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -40,10 +41,16 @@ public abstract class GenericSchemaImpl extends 
AvroBaseStructSchema<GenericReco
     protected GenericSchemaImpl(SchemaInfo schemaInfo) {
         super(schemaInfo);
 
-        this.fields = schema.getFields()
-                .stream()
-                .map(f -> new Field(f.name(), f.pos()))
-                .collect(Collectors.toList());
+        try {
+            this.fields = schema.getFields()
+                    .stream()
+                    .map(f -> new Field(f.name(), f.pos()))
+                    .collect(Collectors.toList());
+        } catch (AvroRuntimeException avroRuntimeException) {
+            // Rewrite error log.
+            throw new AvroRuntimeException("Schema typed [" + 
schema.getClass().getName() + "], simple-type:["
+                    + schema.getType() + "] is not supported. schema-content: 
" + schema);
+        }
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index 0cd06ee3a47..0d521b99294 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -20,17 +20,20 @@ package org.apache.pulsar.client.impl.schema.generic;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.Test;
 
 import java.util.List;
@@ -42,6 +45,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 /**
  * Unit testing generic schemas.
@@ -64,6 +68,29 @@ public class GenericSchemaTest {
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
 
+    @Test
+    public void testUnionSchema() {
+        SchemaInfoImpl schemaInfo = new SchemaInfoImpl();
+        schemaInfo.setType(SchemaType.AVRO);
+        schemaInfo.setSchema(("[{\n"
+                + "\"namespace\": 
\"org.apache.pulsar.schema.compatibility.TestA\",\n"
+                + "\"type\": \"enum\",\n"
+                + "\"name\": \"EventSource\",\n"
+                + "\"symbols\": [\"AUTO_EVENTING\", \"HOODLUM\", \"OPTA\", 
\"ISD\", \"LIVE_STATS\", \"NGSS\", \"UNIFIED\"]\n"
+                + "}, {\n"
+                + "\"namespace\": 
\"org.apache.pulsar.schema.compatibility.TestB\",\n"
+                + "\"type\": \"enum\",\n"
+                + "\"name\": \"PeriodType\",\n"
+                + "\"symbols\": [\"REGULAR\", \"EXTRA_TIME\"]\n"
+                + "}]").getBytes(UTF_8));
+        try {
+            GenericJsonSchema.of(schemaInfo);
+            fail("expected an not-supported exception");
+        } catch (AvroRuntimeException e) {
+            assertTrue(e.getMessage().contains("simple-type:[UNION] is not 
supported"));
+        }
+    }
+
     @Test
     public void testAutoAvroSchema() {
         // configure encode schema

Reply via email to