This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c84788340b [improve][broker] Improve exception for topic does not 
have schema to check (#22974)
4c84788340b is described below

commit 4c84788340b4a3df975bf4a919c7223b31835976
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 1 21:41:43 2024 +0800

    [improve][broker] Improve exception for topic does not have schema to check 
(#22974)
---
 .../service/nonpersistent/NonPersistentTopic.java  | 13 ++++++-
 .../broker/service/persistent/PersistentTopic.java | 13 ++++++-
 .../service/schema/SchemaRegistryServiceImpl.java  |  3 +-
 .../schema/exceptions/NotExistSchemaException.java | 43 ++++++++++++++++++++++
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 16 +++++---
 5 files changed, 80 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0c6ebdfefa0..3801ac7f3ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -66,6 +66,8 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicAttributes;
 import org.apache.pulsar.broker.service.TopicPolicyListener;
 import org.apache.pulsar.broker.service.TransportCnx;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -1239,7 +1241,16 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                     || (!producers.isEmpty())
                     || (numActiveConsumersWithoutAutoSchema != 0)
                     || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
-                return checkSchemaCompatibleForConsumer(schema);
+                return checkSchemaCompatibleForConsumer(schema)
+                        .exceptionally(ex -> {
+                            Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                            if (realCause instanceof NotExistSchemaException) {
+                                throw FutureUtil.wrapToCompletionException(
+                                        new 
IncompatibleSchemaException("Failed to add schema to an active topic"
+                                                + " with empty(BYTES) schema: 
new schema type " + schema.getType()));
+                            }
+                            throw 
FutureUtil.wrapToCompletionException(realCause);
+                        });
             } else {
                 return addSchema(schema).thenCompose(schemaVersion -> 
CompletableFuture.completedFuture(null));
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3d620d31898..07deb116807 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -132,6 +132,8 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -4048,7 +4050,16 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     || (userCreatedProducerCount > 0)
                     || (numActiveConsumersWithoutAutoSchema != 0)
                     || (ledger.getTotalSize() != 0)) {
-                return checkSchemaCompatibleForConsumer(schema);
+                return checkSchemaCompatibleForConsumer(schema)
+                        .exceptionally(ex -> {
+                            Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                            if (realCause instanceof NotExistSchemaException) {
+                                throw FutureUtil.wrapToCompletionException(
+                                        new 
IncompatibleSchemaException("Failed to add schema to an active topic"
+                                                + " with empty(BYTES) schema: 
new schema type " + schema.getType()));
+                            }
+                            throw 
FutureUtil.wrapToCompletionException(realCause);
+                        });
             } else {
                 return addSchema(schema).thenCompose(schemaVersion ->
                         CompletableFuture.completedFuture(null));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 3e9e13b14fe..c1a394dcfbb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -398,7 +399,7 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                     return checkCompatibilityWithAll(schemaId, schemaData, 
strategy);
                 }
             } else {
-                return FutureUtil.failedFuture(new 
IncompatibleSchemaException("Topic does not have schema to check"));
+                return FutureUtil.failedFuture(new 
NotExistSchemaException("Topic does not have schema to check"));
             }
         });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
new file mode 100644
index 00000000000..2fe0a092375
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.exceptions;
+
+/**
+ * Exception is thrown when an schema not exist.
+ */
+public class NotExistSchemaException extends SchemaException {
+
+    private static final long serialVersionUID = -8342983749283749283L;
+
+    public NotExistSchemaException() {
+        super("The schema does not exist");
+    }
+
+    public NotExistSchemaException(String message) {
+        super(message);
+    }
+
+    public NotExistSchemaException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    public NotExistSchemaException(Throwable e) {
+        super(e);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d21e853ba09..ae9ea6d5ae6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -125,6 +126,11 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @DataProvider(name = "topicDomain")
+    public static Object[] topicDomain() {
+        return new Object[] { "persistent://", "non-persistent://" };
+    }
+
     @Test
     public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws 
Exception{
         final String tenant = PUBLIC_TENANT;
@@ -1336,19 +1342,19 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
      *       the new consumer to register new schema. But before we can solve 
this problem, we need to modify
      *       "CmdProducer" to let the Broker know that the Producer uses a 
schema of type "AUTO_PRODUCE_BYTES".
      */
-    @Test
-    public void testAutoProduceAndSpecifiedConsumer() throws Exception {
+    @Test(dataProvider = "topicDomain")
+    public void testAutoProduceAndSpecifiedConsumer(String domain) throws 
Exception {
         final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
         admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(CLUSTER_NAME));
-        final String topicName = "persistent://" + namespace + "/tp_" + 
randomName(16);
+        final String topicName = domain + namespace + "/tp_" + randomName(16);
         admin.topics().createNonPartitionedTopic(topicName);
 
         Producer producer = 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
         try {
             
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
-            fail("Should throw ex: Topic does not have schema to check");
+            fail("Should throw ex: Failed to add schema to an active topic 
with empty(BYTES) schema");
         } catch (Exception ex){
-            assertTrue(ex.getMessage().contains("Topic does not have schema to 
check"));
+            assertTrue(ex.getMessage().contains("Failed to add schema to an 
active topic with empty(BYTES) schema"));
         }
 
         // Cleanup.

Reply via email to