This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a927ef43935798d573c23910af58f5f1dd1e916a Author: fengyubiao <[email protected]> AuthorDate: Thu May 23 21:15:16 2024 +0800 [fix] [client] PIP-344 Do not create partitioned metadata when calling pulsarClient.getPartitionsForTopic(topicName) (#22206) (cherry picked from commit 4e5c0bcc2b44c33a966287b86c2c235be249dc51) --- .../broker/admin/impl/PersistentTopicsBase.java | 26 +- .../apache/pulsar/broker/service/ServerCnx.java | 110 +++-- .../broker/admin/GetPartitionMetadataTest.java | 473 +++++++++++++++++++++ .../pulsar/broker/admin/TopicAutoCreationTest.java | 3 +- .../BrokerServiceAutoTopicCreationTest.java | 4 +- .../pulsar/broker/service/BrokerServiceTest.java | 10 +- .../service/BrokerServiceThrottlingTest.java | 2 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../buffer/TransactionLowWaterMarkTest.java | 4 +- .../pulsar/client/api/BrokerServiceLookupTest.java | 2 +- .../org/apache/pulsar/client/api/PulsarClient.java | 23 +- .../client/impl/BinaryProtoLookupService.java | 13 +- .../pulsar/client/impl/ConsumerBuilderImpl.java | 4 +- .../pulsar/client/impl/HttpLookupService.java | 6 +- .../apache/pulsar/client/impl/LookupService.java | 27 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 32 +- .../TransactionCoordinatorClientImpl.java | 3 +- .../client/impl/MultiTopicsConsumerImplTest.java | 12 +- .../pulsar/client/impl/PulsarClientImplTest.java | 3 +- .../apache/pulsar/common/protocol/Commands.java | 7 +- .../org/apache/pulsar/common/util/FutureUtil.java | 4 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + .../pulsar/proxy/server/LookupProxyHandler.java | 3 +- 24 files changed, 689 insertions(+), 88 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ebb92679599..f9471b03c5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -546,19 +546,29 @@ public class PersistentTopicsBase extends AdminResource { boolean checkAllowAutoCreation) { return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation) .thenCompose(metadata -> { - CompletableFuture<Void> ret; - if (metadata.partitions == 0 && !checkAllowAutoCreation) { + if (metadata.partitions > 1) { + // Some clients does not support partitioned topic. + return internalValidateClientVersionAsync().thenApply(__ -> metadata); + } else if (metadata.partitions == 1) { + return CompletableFuture.completedFuture(metadata); + } else { + // metadata.partitions == 0 // The topic may be a non-partitioned topic, so check if it exists here. // However, when checkAllowAutoCreation is true, the client will create the topic if // it doesn't exist. In this case, `partitions == 0` means the automatically created topic // is a non-partitioned topic so we shouldn't check if the topic exists. - ret = internalCheckTopicExists(topicName); - } else if (metadata.partitions > 1) { - ret = internalValidateClientVersionAsync(); - } else { - ret = CompletableFuture.completedFuture(null); + return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(brokerAllowAutoTopicCreation -> { + if (checkAllowAutoCreation) { + // Whether it exists or not, auto create a non-partitioned topic by client. + return CompletableFuture.completedFuture(metadata); + } else { + // If it does not exist, response a Not Found error. + // Otherwise, response a non-partitioned metadata. + return internalCheckTopicExists(topicName).thenApply(__ -> metadata); + } + }); } - return ret.thenApply(__ -> metadata); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index eed75e2c450..c904c0982c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -83,6 +83,8 @@ import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -613,35 +615,93 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { - unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) - .handle((metadata, ex) -> { - if (ex == null) { - int partitions = metadata.partitions; - commandSender.sendPartitionMetadataResponse(partitions, requestId); - } else { - if (ex instanceof PulsarClientException) { - log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), - remoteAddress, topicName, ex.getMessage()); - commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, - ex.getMessage(), requestId); + // Get if exists, respond not found error if not exists. + getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate -> { + boolean autoCreateIfNotExist = partitionMetadata.isMetadataAutoCreationEnabled(); + if (!autoCreateIfNotExist) { + final NamespaceResources namespaceResources = getBrokerService().pulsar() + .getPulsarResources().getNamespaceResources(); + final TopicResources topicResources = getBrokerService().pulsar().getPulsarResources() + .getTopicResources(); + namespaceResources.getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(topicName, false) + .handle((metadata, getMetadataEx) -> { + if (getMetadataEx != null) { + log.error("{} {} Failed to get partition metadata", topicName, + ServerCnx.this.toString(), getMetadataEx); + writeAndFlush( + Commands.newPartitionMetadataResponse(ServerError.MetadataError, + "Failed to get partition metadata", + requestId)); + } else if (metadata.isPresent()) { + commandSender.sendPartitionMetadataResponse(metadata.get().partitions, + requestId); + } else if (topicName.isPersistent()) { + topicResources.persistentTopicExists(topicName).thenAccept(exists -> { + if (exists) { + commandSender.sendPartitionMetadataResponse(0, requestId); + return; + } + writeAndFlush(Commands.newPartitionMetadataResponse( + ServerError.TopicNotFound, "", requestId)); + }).exceptionally(ex -> { + log.error("{} {} Failed to get partition metadata", topicName, + ServerCnx.this.toString(), ex); + writeAndFlush( + Commands.newPartitionMetadataResponse(ServerError.MetadataError, + "Failed to check partition metadata", + requestId)); + return null; + }); + } else { + // Regarding non-persistent topic, we do not know whether it exists or not. + // Just return a non-partitioned metadata if partitioned metadata does not + // exist. + // Broker will respond a not found error when doing subscribing or producing if + // broker not allow to auto create topics. + commandSender.sendPartitionMetadataResponse(0, requestId); + } + return null; + }).whenComplete((ignore, ignoreEx) -> { + lookupSemaphore.release(); + if (ignoreEx != null) { + log.error("{} {} Failed to handle partition metadata request", topicName, + ServerCnx.this.toString(), ignoreEx); + } + }); + } else { + // Get if exists, create a new one if not exists. + unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) + .whenComplete((metadata, ex) -> { + lookupSemaphore.release(); + if (ex == null) { + int partitions = metadata.partitions; + commandSender.sendPartitionMetadataResponse(partitions, requestId); } else { - log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, - topicName, ex.getMessage(), ex); - ServerError error = ServerError.ServiceNotReady; - if (ex instanceof RestException restException){ - int responseCode = restException.getResponse().getStatus(); - if (responseCode == NOT_FOUND.getStatusCode()){ - error = ServerError.TopicNotFound; - } else if (responseCode < INTERNAL_SERVER_ERROR.getStatusCode()){ - error = ServerError.MetadataError; + if (ex instanceof PulsarClientException) { + log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), + remoteAddress, topicName, ex.getMessage()); + commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, + ex.getMessage(), requestId); + } else { + log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, + topicName, ex.getMessage(), ex); + ServerError error = ServerError.ServiceNotReady; + if (ex instanceof RestException restException){ + int responseCode = restException.getResponse().getStatus(); + if (responseCode == NOT_FOUND.getStatusCode()){ + error = ServerError.TopicNotFound; + } else if (responseCode < INTERNAL_SERVER_ERROR.getStatusCode()){ + error = ServerError.MetadataError; + } } + commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), + requestId); } - commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId); } - } - lookupSemaphore.release(); - return null; - }); + }); + } + }); } else { final String msg = "Client is not authorized to Get Partition Metadata"; log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java new file mode 100644 index 00000000000..51f643d2b78 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Semaphore; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +@Slf4j +public class GetPartitionMetadataTest extends ProducerConsumerBase { + + private static final String DEFAULT_NS = "public/default"; + + private PulsarClientImpl clientWithHttpLookup; + private PulsarClientImpl clientWitBinaryLookup; + + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + clientWithHttpLookup = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + clientWitBinaryLookup = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + if (clientWithHttpLookup != null) { + clientWithHttpLookup.close(); + } + if (clientWitBinaryLookup != null) { + clientWitBinaryLookup.close(); + } + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + } + + private LookupService getLookupService(boolean isUsingHttpLookup) { + if (isUsingHttpLookup) { + return clientWithHttpLookup.getLookup(); + } else { + return clientWitBinaryLookup.getLookup(); + } + } + + @DataProvider(name = "topicDomains") + public Object[][] topicDomains() { + return new Object[][]{ + {TopicDomain.persistent}, + {TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "topicDomains") + public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(true); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + // HTTP client. + final String tp1 = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + clientWithHttpLookup.getPartitionsForTopic(tp1).join(); + Optional<PartitionedTopicMetadata> metadata1 = pulsar.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp1), true).join(); + assertTrue(metadata1.isPresent()); + assertEquals(metadata1.get().partitions, 3); + + // Binary client. + final String tp2 = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + clientWitBinaryLookup.getPartitionsForTopic(tp2).join(); + Optional<PartitionedTopicMetadata> metadata2 = pulsar.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp2), true).join(); + assertTrue(metadata2.isPresent()); + assertEquals(metadata2.get().partitions, 3); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + admin.topics().deletePartitionedTopic(tp1, false); + admin.topics().deletePartitionedTopic(tp2, false); + } + + @DataProvider(name = "autoCreationParamsAll") + public Object[][] autoCreationParamsAll(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true, TopicDomain.persistent}, + {true, true, false, TopicDomain.persistent}, + {true, false, true, TopicDomain.persistent}, + {true, false, false, TopicDomain.persistent}, + {false, true, true, TopicDomain.persistent}, + {false, true, false, TopicDomain.persistent}, + {false, false, true, TopicDomain.persistent}, + {false, false, false, TopicDomain.persistent}, + {true, true, true, TopicDomain.non_persistent}, + {true, true, false, TopicDomain.non_persistent}, + {true, false, true, TopicDomain.non_persistent}, + {true, false, false, TopicDomain.non_persistent}, + {false, true, true, TopicDomain.non_persistent}, + {false, true, false, TopicDomain.non_persistent}, + {false, false, true, TopicDomain.non_persistent}, + {false, false, false, TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "autoCreationParamsAll") + public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup, + TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + admin.topics().createNonPartitionedTopic(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = + lookup.getPartitionedTopicMetadata(topicName, paramMetadataAutoCreationEnabled).join(); + assertEquals(response.partitions, 0); + List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics.contains(topicNameStr)); + List<String> topicList = admin.topics().getList("public/default"); + for (int i = 0; i < 3; i++) { + assertFalse(topicList.contains(topicName.getPartition(i))); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + admin.topics().delete(topicNameStr, false); + } + + @Test(dataProvider = "autoCreationParamsAll") + public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup, + TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + admin.topics().createPartitionedTopic(topicNameStr, 3); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = + lookup.getPartitionedTopicMetadata(topicName, paramMetadataAutoCreationEnabled).join(); + assertEquals(response.partitions, 3); + List<String> topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + admin.topics().deletePartitionedTopic(topicNameStr, false); + } + + @DataProvider(name = "clients") + public Object[][] clients(){ + return new Object[][]{ + // isUsingHttpLookup. + {true, TopicDomain.persistent}, + {false, TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "clients") + public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(true); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = lookup.getPartitionedTopicMetadata(topicName, true).join(); + assertEquals(response.partitions, 3); + List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + assertTrue(partitionedTopics.contains(topicNameStr)); + List<String> topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + for (int i = 0; i < 3; i++) { + // The API "getPartitionedTopicMetadata" only creates the partitioned metadata, it will not create the + // partitions. + assertFalse(topicList.contains(topicName.getPartition(i))); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + admin.topics().deletePartitionedTopic(topicNameStr, false); + } + + @Test(dataProvider = "clients") + public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + conf.setAllowAutoTopicCreation(true); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = lookup.getPartitionedTopicMetadata(topicName, true).join(); + assertEquals(response.partitions, 0); + List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics.contains(topicNameStr)); + List<String> topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + try { + admin.topics().delete(topicNameStr, false); + } catch (Exception ex) {} + } + + @DataProvider(name = "autoCreationParamsNotAllow") + public Object[][] autoCreationParamsNotAllow(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, false, true}, + {true, false, false}, + {false, false, true}, + {false, false, false}, + {false, true, true}, + {false, true, false}, + }; + } + + @Test(dataProvider = "autoCreationParamsNotAllow") + public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + // These test cases are for the following PR. + // Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206. + return; + } + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Define topic. + final String topicNameStr = BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + try { + lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); + fail("Expect a not found exception"); + } catch (Exception e) { + log.warn("", e); + Throwable unwrapEx = FutureUtil.unwrapCompletionException(e); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException + || unwrapEx instanceof PulsarClientException.NotFoundException); + } + + List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName); + assertFalse(partitionedTopics.contains(topicNameStr)); + List<String> topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + for (int i = 0; i < 3; i++) { + assertFalse(topicList.contains(topicName.getPartition(i))); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + } + + @DataProvider(name = "autoCreationParamsForNonPersistentTopic") + public Object[][] autoCreationParamsForNonPersistentTopic(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true}, + {true, true, false}, + {false, true, true}, + {false, true, false}, + {false, false, true} + }; + } + + /** + * Regarding the API "get partitioned metadata" about non-persistent topic. + * The original behavior is: + * param-auto-create = true, broker-config-auto-create = true + * HTTP API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()} + * binary API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()} + * param-auto-create = true, broker-config-auto-create = false + * HTTP API: {partitions: 0} + * binary API: {partitions: 0} + * param-auto-create = false + * HTTP API: not found error + * binary API: not support + * This test only guarantees that the behavior is the same as before. The following separated PR will fix the + * incorrect behavior. + */ + @Test(dataProvider = "autoCreationParamsForNonPersistentTopic") + public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Define topic. + final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + // Regarding non-persistent topic, we do not know whether it exists or not. + // Broker will return a non-partitioned metadata if partitioned metadata does not exist. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + + if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled && isUsingHttpLookup) { + try { + lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled) + .join(); + Assert.fail("Expected a not found ex"); + } catch (Exception ex) { + // Cleanup. + client.close(); + return; + } + } + + PartitionedTopicMetadata metadata = lookup + .getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); + if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + assertEquals(metadata.partitions, 3); + } else { + assertEquals(metadata.partitions, 0); + } + + List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(topicName); + if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + assertTrue(partitionedTopics.contains(topicNameStr)); + } else { + assertFalse(partitionedTopics.contains(topicNameStr)); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index bb4a23bf24b..55601ad4c6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -133,7 +134,7 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { // we want to skip the "lookup" phase, because it is blocked by the HTTP API LookupService mockLookup = mock(LookupService.class); ((PulsarClientImpl) pulsarClient).setLookup(mockLookup); - when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer( + when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer( i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getBroker(any())).thenAnswer(ignored -> { InetSocketAddress brokerAddress = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 0a6cffc7685..ea5365bcf4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -566,13 +566,13 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ try { pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create(); Assert.fail("Create should have failed."); - } catch (PulsarClientException.TopicDoesNotExistException e) { + } catch (PulsarClientException.TopicDoesNotExistException | PulsarClientException.NotFoundException e) { // expected } try { pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create(); Assert.fail("Create should have failed."); - } catch (PulsarClientException.TopicDoesNotExistException e) { + } catch (PulsarClientException.TopicDoesNotExistException | PulsarClientException.NotFoundException e) { // expected } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 442f850bbb9..172842b5ed3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1038,12 +1038,12 @@ public class BrokerServiceTest extends BrokerTestBase { // for PMR // 2 lookup will succeed long reqId1 = reqId++; - ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1); + ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1, true); CompletableFuture<?> f1 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1)); long reqId2 = reqId++; - ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2); + ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2, true); CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> { CompletableFuture<?> future = clientCnx.newLookup(request2, reqId2); @@ -1058,17 +1058,17 @@ public class BrokerServiceTest extends BrokerTestBase { // 3 lookup will fail latchRef.set(new CountDownLatch(1)); long reqId3 = reqId++; - ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3); + ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3, true); f1 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3)); long reqId4 = reqId++; - ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4); + ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4, true); f2 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4)); long reqId5 = reqId++; - ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5); + ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5, true); CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> { CompletableFuture<?> future = clientCnx.newLookup(request5, reqId5); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index 312bfe0fc8a..c6a94833c4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -198,7 +198,7 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase { for (int i = 0; i < totalConsumers; i++) { long reqId = 0xdeadbeef + i; Future<?> f = executor.submit(() -> { - ByteBuf request = Commands.newPartitionMetadataRequest(topicName, reqId); + ByteBuf request = Commands.newPartitionMetadataRequest(topicName, reqId, true); pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request, reqId)) .get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 1cb2f76c5e2..5387bc4998c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -3571,7 +3571,7 @@ public class ServerCnxTest { doReturn(false).when(pulsar).isRunning(); assertTrue(channel.isActive()); - ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1); + ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1, true); channel.writeInbound(clientCommand); Object response = getResponse(); assertTrue(response instanceof CommandPartitionedTopicMetadataResponse); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index aa7240a59f9..6e121aca381 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -148,7 +148,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() @@ -253,7 +253,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 0ad0b01dc1c..336728f279e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -931,7 +931,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { // Verify the request is works after merge the requests. List<CompletableFuture<PartitionedTopicMetadata>> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { - futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName))); + futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false)); } for (CompletableFuture<PartitionedTopicMetadata> future : futures) { assertEquals(future.join().partitions, topicPartitions); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 78952fcaed8..6c46bce254f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -308,14 +308,33 @@ public interface PulsarClient extends Closeable { * * <p>This can be used to discover the partitions and create {@link Reader}, {@link Consumer} or {@link Producer} * instances directly on a particular partition. - * + * @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(String, boolean)}. * @param topic * the topic name * @return a future that will yield a list of the topic partitions or {@link PulsarClientException} if there was any * error in the operation. + * * @since 2.3.0 */ - CompletableFuture<List<String>> getPartitionsForTopic(String topic); + @Deprecated + default CompletableFuture<List<String>> getPartitionsForTopic(String topic) { + return getPartitionsForTopic(topic, true); + } + + /** + * 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a + * partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists. + * 2. When {@param metadataAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned + * topic does not exist. You will get an {@link PulsarClientException.NotFoundException} or a + * {@link PulsarClientException.TopicDoesNotExistException}. + * 2-1. You will get a {@link PulsarClientException.NotSupportedException} with metadataAutoCreationEnabled=false + * on an old broker version which does not support getting partitions without partitioned metadata auto-creation. + * 3. When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using + * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. + * For the result, see case 1. + * @version 3.3.0. + */ + CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled); /** * Close the PulsarClient and release all the resources. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 9c9371b09cb..b363d6e4366 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -144,12 +144,14 @@ public class BinaryProtoLookupService implements LookupService { * calls broker binaryProto-lookup api to get metadata of partitioned-topic. * */ - public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) { + @Override + public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( + TopicName topicName, boolean metadataAutoCreationEnabled) { final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>(); try { return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { - CompletableFuture<PartitionedTopicMetadata> newFuture = - getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName); + CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadata( + serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled); newFutureCreated.setValue(newFuture); return newFuture; }); @@ -246,14 +248,15 @@ public class BinaryProtoLookupService implements LookupService { } private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress, - TopicName topicName) { + TopicName topicName, boolean metadataAutoCreationEnabled) { long startTime = System.nanoTime(); CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); - ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); + ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, + metadataAutoCreationEnabled); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 7686d0072cf..7735f66e783 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -136,9 +136,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata = - client.getPartitionedTopicMetadata(oldRetryLetterTopic); + client.getPartitionedTopicMetadata(oldRetryLetterTopic, true); CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata = - client.getPartitionedTopicMetadata(oldDeadLetterTopic); + client.getPartitionedTopicMetadata(oldDeadLetterTopic, true); applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index ea13930411d..44ef4ac17ee 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -136,12 +136,14 @@ public class HttpLookupService implements LookupService { } @Override - public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) { + public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( + TopicName topicName, boolean metadataAutoCreationEnabled) { long startTime = System.nanoTime(); String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; CompletableFuture<PartitionedTopicMetadata> httpFuture = httpClient.get( - String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", + String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=" + + metadataAutoCreationEnabled, PartitionedTopicMetadata.class); httpFuture.thenRun(() -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 4d59d6591db..ccd1f6b23f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -60,11 +60,30 @@ public interface LookupService extends AutoCloseable { /** * Returns {@link PartitionedTopicMetadata} for a given topic. - * - * @param topicName topic-name - * @return + * Note: this method will try to create the topic partitioned metadata if it does not exist. + * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean)}}. + */ + @Deprecated + default CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) { + return getPartitionedTopicMetadata(topicName, true); + } + + /** + * 1.Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; + * return "{partition: 0}" if a non-partitioned topic exists. + * 2. When {@param metadataAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic + * does not exist. You will get a {@link PulsarClientException.NotFoundException} or + * a {@link PulsarClientException.TopicDoesNotExistException}. + * 2-1. You will get a {@link PulsarClientException.NotSupportedException} with metadataAutoCreationEnabled=false + * on an old broker version which does not support getting partitions without partitioned metadata + * auto-creation. + * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using + * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. + * For the result, see case 1. + * @version 3.3.0. */ - CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName); + CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName, + boolean metadataAutoCreationEnabled); /** * Returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 20fd03d6a28..8047e05351a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -954,7 +954,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { CompletableFuture<Void> subscribeResult = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topicName) + client.getPartitionedTopicMetadata(topicName, true) .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, createTopicIfDoesNotExist)) .exceptionally(ex1 -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 1e3964ed750..b93786bb4b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -387,7 +387,7 @@ public class PulsarClientImpl implements PulsarClient { ProducerInterceptors interceptors) { CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -529,7 +529,7 @@ public class PulsarClientImpl implements PulsarClient { String topic = conf.getSingleTopic(); - getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -669,7 +669,7 @@ public class PulsarClientImpl implements PulsarClient { CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -1090,11 +1090,8 @@ public class PulsarClientImpl implements PulsarClient { } } - public CompletableFuture<Integer> getNumberOfPartitions(String topic) { - return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions); - } - - public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) { + public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( + String topic, boolean metadataAutoCreationEnabled) { CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); @@ -1107,7 +1104,7 @@ public class PulsarClientImpl implements PulsarClient { .setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .create(); getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, - metadataFuture, new ArrayList<>()); + metadataFuture, new ArrayList<>(), metadataAutoCreationEnabled); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } @@ -1118,15 +1115,19 @@ public class PulsarClientImpl implements PulsarClient { Backoff backoff, AtomicLong remainingTime, CompletableFuture<PartitionedTopicMetadata> future, - List<Throwable> previousExceptions) { + List<Throwable> previousExceptions, + boolean metadataAutoCreationEnabled) { long startTime = System.nanoTime(); - lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { + CompletableFuture<PartitionedTopicMetadata> queryFuture = + lookup.getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled); + queryFuture.thenAccept(future::complete).exceptionally(e -> { remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); long nextDelay = Math.min(backoff.next(), remainingTime.get()); // skip retry scheduler when set lookup throttle in client or server side which will lead to // `TooManyRequestsException` boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause()) - || e.getCause() instanceof PulsarClientException.AuthenticationException; + || e.getCause() instanceof PulsarClientException.AuthenticationException + || e.getCause() instanceof PulsarClientException.NotFoundException; if (nextDelay <= 0 || isLookupThrottling) { PulsarClientException.setPreviousExceptions(e, previousExceptions); future.completeExceptionally(e); @@ -1138,15 +1139,16 @@ public class PulsarClientImpl implements PulsarClient { log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- " + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); - getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions); + getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions, + metadataAutoCreationEnabled); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); } @Override - public CompletableFuture<List<String>> getPartitionsForTopic(String topic) { - return getPartitionedTopicMetadata(topic).thenApply(metadata -> { + public CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) { + return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled).thenApply(metadata -> { if (metadata.partitions > 0) { TopicName topicName = TopicName.get(topic); List<String> partitions = new ArrayList<>(metadata.partitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 9e79fc203c2..499627f9c73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -79,7 +79,8 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC @Override public CompletableFuture<Void> startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { - return pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN) + return pulsarClient.getLookup() + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, true) .thenCompose(partitionMeta -> { List<CompletableFuture<Void>> connectFutureList = new ArrayList<>(); if (LOG.isDebugEnabled()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index febec2bff32..191124bb7b0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedComp import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture; import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -153,7 +154,8 @@ public class MultiTopicsConsumerImplTest { int completionDelayMillis = 100; Schema<byte[]> schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createDelayedCompletedFuture( + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + .thenAnswer(invocation -> createDelayedCompletedFuture( new PartitionedTopicMetadata(), completionDelayMillis)); MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>( clientMock, consumerConfData, executorProvider, @@ -201,7 +203,8 @@ public class MultiTopicsConsumerImplTest { int completionDelayMillis = 10; Schema<byte[]> schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture( + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + .thenAnswer(invocation -> createExceptionFuture( new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis)); CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>(); MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, @@ -237,7 +240,8 @@ public class MultiTopicsConsumerImplTest { // Simulate non partitioned topics PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0); - when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata)); + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(metadata)); CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>(); MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<>( @@ -248,7 +252,7 @@ public class MultiTopicsConsumerImplTest { // getPartitionedTopicMetadata should have been called only the first time, for each of the 3 topics, // but not anymore since the topics are not partitioned. - verify(clientMock, times(3)).getPartitionedTopicMetadata(any()); + verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean()); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 274b9b4f2d5..3e897ed89f2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.nullable; @@ -107,7 +108,7 @@ public class PulsarClientImplTest { nullable(String.class))) .thenReturn(CompletableFuture.completedFuture( new GetTopicsResult(Collections.emptyList(), null, false, true))); - when(lookup.getPartitionedTopicMetadata(any(TopicName.class))) + when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())); when(lookup.getBroker(any())) .thenReturn(CompletableFuture.completedFuture(new LookupTopicResult( diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 65674af0ae1..53907e61914 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -190,6 +190,7 @@ public class Commands { flags.setSupportsAuthRefresh(true); flags.setSupportsBrokerEntryMetadata(true); flags.setSupportsPartialProducer(true); + flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true); } public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, @@ -910,11 +911,13 @@ public class Commands { return serializeWithSize(newPartitionMetadataResponseCommand(error, errorMsg, requestId)); } - public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) { + public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, + boolean metadataAutoCreationEnabled) { BaseCommand cmd = localCmd(Type.PARTITIONED_METADATA); cmd.setPartitionMetadata() .setTopic(topic) - .setRequestId(requestId); + .setRequestId(requestId) + .setMetadataAutoCreationEnabled(metadataAutoCreationEnabled); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index f6fcb12f359..0628d494af3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -199,9 +199,9 @@ public class FutureUtil { public static Throwable unwrapCompletionException(Throwable ex) { if (ex instanceof CompletionException) { - return ex.getCause(); + return unwrapCompletionException(ex.getCause()); } else if (ex instanceof ExecutionException) { - return ex.getCause(); + return unwrapCompletionException(ex.getCause()); } else { return ex; } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 387e4e3ff67..5a7eb582eb5 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -300,6 +300,7 @@ message FeatureFlags { optional bool supports_broker_entry_metadata = 2 [default = false]; optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; + optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; } message CommandConnected { @@ -413,6 +414,7 @@ message CommandPartitionedTopicMetadata { // to the proxy. optional string original_auth_data = 4; optional string original_auth_method = 5; + optional bool metadata_auto_creation_enabled = 6 [default = true]; } message CommandPartitionedTopicMetadataResponse { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index f76adadcc3e..03975e153ac 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -241,7 +241,8 @@ public class LookupProxyHandler { // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; - command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); + command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, + partitionMetadata.isMetadataAutoCreationEnabled()); clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { if (t != null) { log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
