This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b5fdeff5444b2f7ca2da0dea229a1d1583df9d9d Author: fengyubiao <[email protected]> AuthorDate: Thu Jun 6 16:09:38 2024 +0800 [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation (#22773) (cherry picked from commit 6236116754472c61b2166da6d4797fc63c83f364) --- .../apache/pulsar/client/impl/ClientCnxTest.java | 44 ++++++++++++++++++++++ .../client/impl/BinaryProtoLookupService.java | 6 +++ .../org/apache/pulsar/client/impl/ClientCnx.java | 5 +++ .../apache/pulsar/common/protocol/Commands.java | 1 + 4 files changed, 56 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index dfd52d494ae..df6b1b8a8f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -20,13 +20,17 @@ package org.apache.pulsar.client.impl; import com.google.common.collect.Sets; import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -124,4 +128,44 @@ public class ClientCnxTest extends MockedPulsarServiceBaseTest { producer.close(); consumer.close(); } + + @Test + public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception { + final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp"); + admin.topics().createNonPartitionedTopic(topic); + PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) PulsarClient.builder() + .maxNumberOfRejectedRequestPerConnection(1) + .connectionMaxIdleSeconds(Integer.MAX_VALUE) + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + ProducerImpl producer = (ProducerImpl) clientWitBinaryLookup.newProducer().topic(topic).create(); + + // Verify: the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker is true. + Awaitility.await().untilAsserted(() -> { + ClientCnx clientCnx = producer.getClientCnx(); + Assert.assertNotNull(clientCnx); + Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()); + }); + Assert.assertEquals( + clientWitBinaryLookup.getPartitionsForTopic(topic, true).get().size(), 1); + + // Inject a "false" value for the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation". + // Verify: client will get a not support error. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (CompletableFuture<ClientCnx> clientCnxFuture : clientWitBinaryLookup.getCnxPool().getConnections()) { + field.set(clientCnxFuture.get(), false); + } + try { + clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); + Assert.fail("Expected an error that the broker version is too old."); + } catch (Exception ex) { + Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker")); + } + + // cleanup. + producer.close(); + clientWitBinaryLookup.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index b363d6e4366..bf015c564b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -254,6 +254,12 @@ public class BinaryProtoLookupService implements LookupService { CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { + partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of" + + " getting partitions without auto-creation is not supported from the broker," + + " please upgrade the broker to the latest version.")); + return; + } long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, metadataAutoCreationEnabled); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 03e0f406dd2..6f343a2ee58 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -191,6 +191,8 @@ public class ClientCnx extends PulsarHandler { protected AuthenticationDataProvider authenticationDataProvider; private TransactionBufferHandler transactionBufferHandler; private boolean supportsTopicWatchers; + @Getter + private boolean supportsGetPartitionedMetadataWithoutAutoCreation; /** Idle stat. **/ @Getter @@ -400,6 +402,9 @@ public class ClientCnx extends PulsarHandler { supportsTopicWatchers = connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers(); + supportsGetPartitionedMetadataWithoutAutoCreation = + connected.hasFeatureFlags() + && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation(); // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 53907e61914..4822dba023d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -301,6 +301,7 @@ public class Commands { connected.setProtocolVersion(versionToAdvertise); connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); + connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true); return cmd; }
