This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4c84788340b [improve][broker] Improve exception for topic does not
have schema to check (#22974)
4c84788340b is described below
commit 4c84788340b4a3df975bf4a919c7223b31835976
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 1 21:41:43 2024 +0800
[improve][broker] Improve exception for topic does not have schema to check
(#22974)
---
.../service/nonpersistent/NonPersistentTopic.java | 13 ++++++-
.../broker/service/persistent/PersistentTopic.java | 13 ++++++-
.../service/schema/SchemaRegistryServiceImpl.java | 3 +-
.../schema/exceptions/NotExistSchemaException.java | 43 ++++++++++++++++++++++
.../java/org/apache/pulsar/schema/SchemaTest.java | 16 +++++---
5 files changed, 80 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0c6ebdfefa0..3801ac7f3ee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -66,6 +66,8 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicAttributes;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
@@ -1239,7 +1241,16 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
|| (!producers.isEmpty())
|| (numActiveConsumersWithoutAutoSchema != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
- return checkSchemaCompatibleForConsumer(schema);
+ return checkSchemaCompatibleForConsumer(schema)
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotExistSchemaException) {
+ throw FutureUtil.wrapToCompletionException(
+ new
IncompatibleSchemaException("Failed to add schema to an active topic"
+ + " with empty(BYTES) schema:
new schema type " + schema.getType()));
+ }
+ throw
FutureUtil.wrapToCompletionException(realCause);
+ });
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3d620d31898..07deb116807 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -132,6 +132,8 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -4048,7 +4050,16 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
|| (userCreatedProducerCount > 0)
|| (numActiveConsumersWithoutAutoSchema != 0)
|| (ledger.getTotalSize() != 0)) {
- return checkSchemaCompatibleForConsumer(schema);
+ return checkSchemaCompatibleForConsumer(schema)
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotExistSchemaException) {
+ throw FutureUtil.wrapToCompletionException(
+ new
IncompatibleSchemaException("Failed to add schema to an active topic"
+ + " with empty(BYTES) schema:
new schema type " + schema.getType()));
+ }
+ throw
FutureUtil.wrapToCompletionException(realCause);
+ });
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 3e9e13b14fe..c1a394dcfbb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -398,7 +399,7 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
return checkCompatibilityWithAll(schemaId, schemaData,
strategy);
}
} else {
- return FutureUtil.failedFuture(new
IncompatibleSchemaException("Topic does not have schema to check"));
+ return FutureUtil.failedFuture(new
NotExistSchemaException("Topic does not have schema to check"));
}
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
new file mode 100644
index 00000000000..2fe0a092375
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.exceptions;
+
+/**
+ * Exception is thrown when an schema not exist.
+ */
+public class NotExistSchemaException extends SchemaException {
+
+ private static final long serialVersionUID = -8342983749283749283L;
+
+ public NotExistSchemaException() {
+ super("The schema does not exist");
+ }
+
+ public NotExistSchemaException(String message) {
+ super(message);
+ }
+
+ public NotExistSchemaException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ public NotExistSchemaException(Throwable e) {
+ super(e);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d21e853ba09..ae9ea6d5ae6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.metadata.api.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -125,6 +126,11 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @DataProvider(name = "topicDomain")
+ public static Object[] topicDomain() {
+ return new Object[] { "persistent://", "non-persistent://" };
+ }
+
@Test
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws
Exception{
final String tenant = PUBLIC_TENANT;
@@ -1336,19 +1342,19 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
* the new consumer to register new schema. But before we can solve
this problem, we need to modify
* "CmdProducer" to let the Broker know that the Producer uses a
schema of type "AUTO_PRODUCE_BYTES".
*/
- @Test
- public void testAutoProduceAndSpecifiedConsumer() throws Exception {
+ @Test(dataProvider = "topicDomain")
+ public void testAutoProduceAndSpecifiedConsumer(String domain) throws
Exception {
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(CLUSTER_NAME));
- final String topicName = "persistent://" + namespace + "/tp_" +
randomName(16);
+ final String topicName = domain + namespace + "/tp_" + randomName(16);
admin.topics().createNonPartitionedTopic(topicName);
Producer producer =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
try {
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
- fail("Should throw ex: Topic does not have schema to check");
+ fail("Should throw ex: Failed to add schema to an active topic
with empty(BYTES) schema");
} catch (Exception ex){
- assertTrue(ex.getMessage().contains("Topic does not have schema to
check"));
+ assertTrue(ex.getMessage().contains("Failed to add schema to an
active topic with empty(BYTES) schema"));
}
// Cleanup.