This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new b6a8c03128a [improve][broker] Improve exception for topic does not
have schema to check (#22974)
b6a8c03128a is described below
commit b6a8c03128a5d8abf44a9aa7086329e32aa12113
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 1 21:41:43 2024 +0800
[improve][broker] Improve exception for topic does not have schema to check
(#22974)
(cherry picked from commit 4c84788340b4a3df975bf4a919c7223b31835976)
---
.../service/nonpersistent/NonPersistentTopic.java | 13 ++++++-
.../broker/service/persistent/PersistentTopic.java | 13 ++++++-
.../service/schema/SchemaRegistryServiceImpl.java | 3 +-
.../schema/exceptions/NotExistSchemaException.java | 43 ++++++++++++++++++++++
.../java/org/apache/pulsar/schema/SchemaTest.java | 16 +++++---
5 files changed, 80 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 22ee11d8eaf..220aa817955 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -63,6 +63,8 @@ import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
@@ -1220,7 +1222,16 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
|| (!producers.isEmpty())
|| (numActiveConsumersWithoutAutoSchema != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
- return checkSchemaCompatibleForConsumer(schema);
+ return checkSchemaCompatibleForConsumer(schema)
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotExistSchemaException) {
+ throw FutureUtil.wrapToCompletionException(
+ new
IncompatibleSchemaException("Failed to add schema to an active topic"
+ + " with empty(BYTES) schema:
new schema type " + schema.getType()));
+ }
+ throw
FutureUtil.wrapToCompletionException(realCause);
+ });
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bd53191a261..bbfb5498e26 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -124,6 +124,8 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -3784,7 +3786,16 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
|| (userCreatedProducerCount > 0)
|| (numActiveConsumersWithoutAutoSchema != 0)
|| (ledger.getTotalSize() != 0)) {
- return checkSchemaCompatibleForConsumer(schema);
+ return checkSchemaCompatibleForConsumer(schema)
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotExistSchemaException) {
+ throw FutureUtil.wrapToCompletionException(
+ new
IncompatibleSchemaException("Failed to add schema to an active topic"
+ + " with empty(BYTES) schema:
new schema type " + schema.getType()));
+ }
+ throw
FutureUtil.wrapToCompletionException(realCause);
+ });
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index ae56df248d8..2bbe2e366b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -393,7 +394,7 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
return checkCompatibilityWithAll(schemaId, schemaData,
strategy);
}
} else {
- return FutureUtil.failedFuture(new
IncompatibleSchemaException("Topic does not have schema to check"));
+ return FutureUtil.failedFuture(new
NotExistSchemaException("Topic does not have schema to check"));
}
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
new file mode 100644
index 00000000000..2fe0a092375
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.exceptions;
+
+/**
+ * Exception is thrown when an schema not exist.
+ */
+public class NotExistSchemaException extends SchemaException {
+
+ private static final long serialVersionUID = -8342983749283749283L;
+
+ public NotExistSchemaException() {
+ super("The schema does not exist");
+ }
+
+ public NotExistSchemaException(String message) {
+ super(message);
+ }
+
+ public NotExistSchemaException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ public NotExistSchemaException(Throwable e) {
+ super(e);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index aa47c378fc3..7bb1b4714dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -119,6 +120,11 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @DataProvider(name = "topicDomain")
+ public static Object[] topicDomain() {
+ return new Object[] { "persistent://", "non-persistent://" };
+ }
+
@Test
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws
Exception{
final String tenant = PUBLIC_TENANT;
@@ -1330,19 +1336,19 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
* the new consumer to register new schema. But before we can solve
this problem, we need to modify
* "CmdProducer" to let the Broker know that the Producer uses a
schema of type "AUTO_PRODUCE_BYTES".
*/
- @Test
- public void testAutoProduceAndSpecifiedConsumer() throws Exception {
+ @Test(dataProvider = "topicDomain")
+ public void testAutoProduceAndSpecifiedConsumer(String domain) throws
Exception {
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(CLUSTER_NAME));
- final String topicName = "persistent://" + namespace + "/tp_" +
randomName(16);
+ final String topicName = domain + namespace + "/tp_" + randomName(16);
admin.topics().createNonPartitionedTopic(topicName);
Producer producer =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
try {
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
- fail("Should throw ex: Topic does not have schema to check");
+ fail("Should throw ex: Failed to add schema to an active topic
with empty(BYTES) schema");
} catch (Exception ex){
- assertTrue(ex.getMessage().contains("Topic does not have schema to
check"));
+ assertTrue(ex.getMessage().contains("Failed to add schema to an
active topic with empty(BYTES) schema"));
}
// Cleanup.