This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3dfb2a fix get partition metadata problem for a non-existed topic
(#8818)
a3dfb2a is described below
commit a3dfb2a40979a2b9087b078e78044b91be558a76
Author: Aloys <[email protected]>
AuthorDate: Thu Jan 7 09:14:06 2021 +0800
fix get partition metadata problem for a non-existed topic (#8818)
Fixes #8813
### Motivation
Currently, getting the partition metadata for a non-existed topic, it
returns 0 instead of throwing an exception.
This pr fix this by throwing an exception.
### Modifications
If no metadata found in global zk, then will check whether the topic is
exist, if yes, will return 0, otherwise will throw an exception.
---
.../apache/pulsar/broker/admin/AdminResource.java | 21 +++++++++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 1 +
.../org/apache/pulsar/broker/web/RestException.java | 11 +++++++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 1 -
.../pulsar/broker/admin/PersistentTopicsTest.java | 8 ++++++++
.../service/BrokerServiceAutoTopicCreationTest.java | 1 +
.../pulsar/client/api/PartitionCreationTest.java | 2 +-
.../pulsar/client/api/PulsarClientException.java | 2 ++
8 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 19b6d85..67ec247 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -47,6 +47,7 @@ import
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -376,6 +377,26 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
+ protected void validateTopicExistedAndCheckAllowAutoCreation(String
tenant, String namespace,
+ String
encodedTopic, boolean checkAllowAutoCreation) {
+ try {
+ PartitionedTopicMetadata partitionedTopicMetadata =
+
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
+ if (partitionedTopicMetadata.partitions < 1) {
+ if
(!pulsar().getNamespaceService().checkTopicExists(topicName).get()
+ && checkAllowAutoCreation
+ &&
!pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)) {
+ throw new RestException(Status.NOT_FOUND,
+ new PulsarClientException.NotFoundException("Topic
not exist"));
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed to validate topic existed {}://{}/{}/{}",
+ domain(), tenant, namespace, topicName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic
partition meta failed.");
+ }
+ }
+
@Deprecated
protected void validateTopicName(String property, String cluster, String
namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3d4c1e0..888a478 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -763,6 +763,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is check configuration required to
automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false")
boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicExistedAndCheckAllowAutoCreation(tenant, namespace,
encodedTopic, checkAllowAutoCreation);
return internalGetPartitionedMetadata(authoritative,
checkAllowAutoCreation);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
index 835a985..0cec819 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.ErrorData;
*/
@SuppressWarnings("serial")
public class RestException extends WebApplicationException {
+ private Throwable cause = null;
static String getExceptionData(Throwable t) {
StringWriter writer = new StringWriter();
writer.append("\n --- An unexpected error occurred in the server
---\n\n");
@@ -56,6 +57,16 @@ public class RestException extends WebApplicationException {
super(getResponse(t));
}
+ public RestException(Response.Status status, Throwable t) {
+ this(status.getStatusCode(), t.getMessage());
+ this.cause = t;
+ }
+
+ @Override
+ public Throwable getCause() {
+ return cause;
+ }
+
public RestException(PulsarAdminException cae) {
this(cae.getStatusCode(), cae.getHttpError());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index fc8e3ea..de6c1a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -874,7 +874,6 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
0);
-
// check the getPartitionedStats for PartitionedTopic returns only
partitions metadata, and no partitions info
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
admin.topics().getPartitionedStats(partitionedTopicName,false).metadata.partitions);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 2ebbbda..480f947 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -266,6 +266,10 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(persistentTopics
.getPartitionedMetadata(testTenant, testNamespace,
nonPartitionTopic, true, false).partitions,
0);
+
+ Assert.assertEquals(persistentTopics
+ .getPartitionedMetadata(testTenant, testNamespace,
nonPartitionTopic, true, true).partitions,
+ 0);
}
@Test
@@ -275,6 +279,10 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
PartitionedTopicMetadata pMetadata =
persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName, true, false);
Assert.assertEquals(pMetadata.partitions, 0);
+
+ PartitionedTopicMetadata metadata =
persistentTopics.getPartitionedMetadata(
+ testTenant, testNamespace, topicName, true, true);
+ Assert.assertEquals(metadata.partitions, 0);
}
@Test(expectedExceptions = RestException.class)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 16ec901..8639928 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index dfdb25f..f223272 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -83,7 +83,7 @@ public class PartitionCreationTest extends
ProducerConsumerBase {
// passed non persistent topic here since we can not avoid
auto creation on non persistent topic now.
Assert.assertNotNull(consumer);
}
- } catch (PulsarClientException.TopicDoesNotExistException e) {
+ } catch (PulsarClientException.TopicDoesNotExistException |
PulsarClientException.NotFoundException e) {
//ok
}
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 1c71e01..8654203 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -1021,6 +1021,8 @@ public class PulsarClientException extends IOException {
return new ProducerFencedException(msg);
} else if (cause instanceof MemoryBufferIsFullError) {
return new MemoryBufferIsFullError(msg);
+ } else if (cause instanceof NotFoundException) {
+ return new NotFoundException(msg);
} else {
return new PulsarClientException(t);
}