This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 617c110ce8a [fix] [broker] Fix compatibility issues for PIP-344 
(#23136)
617c110ce8a is described below

commit 617c110ce8a1af367ea09e3b34477b6633e8b5e0
Author: fengyubiao <[email protected]>
AuthorDate: Sat Aug 10 15:55:55 2024 +0800

    [fix] [broker] Fix compatibility issues for PIP-344 (#23136)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 702c0b3cfa251636099871f65f0ba1ac1a52069c)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 17 ++++-
 .../admin/GetPartitionMetadataMultiBrokerTest.java | 88 ++++++++++++++++++++++
 .../broker/admin/GetPartitionMetadataTest.java     | 74 ++++++++++++++++--
 .../pulsar/broker/admin/TopicAutoCreationTest.java |  4 +-
 .../buffer/TransactionLowWaterMarkTest.java        |  6 +-
 .../apache/pulsar/client/impl/ClientCnxTest.java   |  2 +-
 .../pulsar/client/api/PulsarClientException.java   | 25 ++++++
 .../client/impl/BinaryProtoLookupService.java      | 29 +++++--
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  2 +-
 .../pulsar/client/impl/HttpLookupService.java      |  6 +-
 .../apache/pulsar/client/impl/LookupService.java   | 19 ++++-
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 30 +++++---
 .../TransactionCoordinatorClientImpl.java          |  2 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  8 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   |  2 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 17 files changed, 273 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b8ca38624d0..54142b555be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -23,6 +23,7 @@ import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
 import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import com.google.common.hash.Hashing;
 import io.prometheus.client.Counter;
@@ -1448,8 +1449,22 @@ public class NamespaceService implements AutoCloseable {
                                     || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
                                     || actEx instanceof 
PulsarAdminException.NotFoundException) {
                                 return 
CompletableFuture.completedFuture(false);
+                            } else if (actEx instanceof 
PulsarClientException.FeatureNotSupportedException fe){
+                                if (fe.getFailedFeatureCheck() == 
SupportsGetPartitionedMetadataWithoutAutoCreation) {
+                                    // Since the feature PIP-344 isn't 
supported, restore the behavior to previous
+                                    // behavior before 
https://github.com/apache/pulsar/pull/22838 changes.
+                                    log.info("{} Checking the existence of a 
non-persistent non-partitioned topic "
+                                                    + "was performed using the 
behavior prior to PIP-344 changes, "
+                                                    + "because the broker does 
not support the PIP-344 feature "
+                                                    + 
"'supports_get_partitioned_metadata_without_auto_creation'.",
+                                            topic);
+                                    return 
CompletableFuture.completedFuture(false);
+                                } else {
+                                    log.error("{} Failed to get partition 
metadata", topic, ex);
+                                    return CompletableFuture.failedFuture(ex);
+                                }
                             } else {
-                                log.error("{} Failed to get partition metadata 
due to redirecting fails", topic, ex);
+                                log.error("{} Failed to get partition 
metadata", topic, ex);
                                 return CompletableFuture.failedFuture(ex);
                             }
                         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
index 28cf91ee165..60691203e77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
@@ -18,20 +18,32 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
@@ -219,4 +231,80 @@ public class GetPartitionMetadataMultiBrokerTest extends 
GetPartitionMetadataTes
         
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
                 paramMetadataAutoCreationEnabled, isUsingHttpLookup);
     }
+
+    @DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
+    public Object[][] autoCreationParamsAllForNonPersistentTopic(){
+        return new Object[][]{
+                // configAllowAutoTopicCreation, 
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
+                {true, true, true},
+                {true, true, false},
+                {true, false, true},
+                {true, false, false},
+                {false, true, true},
+                {false, true, false},
+                {false, false, true},
+                {false, false, false}
+        };
+    }
+
+    @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", 
priority = Integer.MAX_VALUE)
+    public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean 
configAllowAutoTopicCreation,
+                                                  boolean 
paramMetadataAutoCreationEnabled,
+                                                  boolean isUsingHttpLookup) 
throws Exception {
+        modifyTopicAutoCreation(configAllowAutoTopicCreation, 
TopicType.PARTITIONED, 3);
+
+        // Initialize the connections of internal Pulsar Client.
+        PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
+        PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
+        
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS
 + "/tp1"));
+        
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS
 + "/tp1"));
+
+        // Inject a not support flag into the connections initialized.
+        Field field = 
ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
+        field.setAccessible(true);
+        for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
+            ConnectionPool pool = client.getCnxPool();
+            for (CompletableFuture<ClientCnx> connectionFuture : 
pool.getConnections()) {
+                ClientCnx clientCnx = connectionFuture.join();
+                
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+                field.set(clientCnx, false);
+            }
+        }
+        // Verify: the method "getPartitionsForTopic(topic, false, true)" will 
fallback
+        //   to "getPartitionsForTopic(topic, true)" behavior.
+        int lookupPermitsBefore = getLookupRequestPermits();
+
+        // Verify: we will not get an un-support error.
+        PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+        for (PulsarClientImpl client : clientArray) {
+            final String topicNameStr = 
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
+            try {
+                PartitionedTopicMetadata topicMetadata = client
+                        .getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled, false)
+                        .join();
+                log.info("Get topic metadata: {}", topicMetadata.partitions);
+            } catch (Exception ex) {
+                Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+                assertTrue(unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                        || unwrapEx instanceof 
PulsarClientException.NotFoundException);
+                assertFalse(ex.getMessage().contains("getting partitions 
without auto-creation is not supported from"
+                        + " the broker"));
+            }
+        }
+
+        // Verify: lookup semaphore has been releases.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+        });
+
+        // reset clients.
+        for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
+            ConnectionPool pool = client.getCnxPool();
+            for (CompletableFuture<ClientCnx> connectionFuture : 
pool.getConnections()) {
+                ClientCnx clientCnx = connectionFuture.join();
+                
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+                field.set(clientCnx, true);
+            }
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index bf99b172829..87bc4267b48 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -23,10 +23,12 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -34,6 +36,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -225,6 +229,60 @@ public class GetPartitionMetadataTest {
         }
     }
 
+    @Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE)
+    public void testCompatibilityForNewClientAndOldBroker(TopicDomain 
topicDomain) throws Exception {
+        modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
+        // Initialize connections.
+        String pulsarUrl = pulsar1.getBrokerServiceUrl();
+        PulsarClientImpl[] clients = getClientsToTest(false);
+        for (PulsarClientImpl client : clients) {
+            client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + 
"/tp1"));
+        }
+        // Inject a not support flag into the connections initialized.
+        Field field = 
ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
+        field.setAccessible(true);
+        for (PulsarClientImpl client : clients) {
+            ConnectionPool pool = client.getCnxPool();
+            for (CompletableFuture<ClientCnx> connectionFuture : 
pool.getConnections()) {
+                ClientCnx clientCnx = connectionFuture.join();
+                
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+                field.set(clientCnx, false);
+            }
+        }
+
+        // Verify: the method "getPartitionsForTopic(topic, false, true)" will 
fallback to
+        // "getPartitionsForTopic(topic)" behavior.
+        int lookupPermitsBefore = getLookupRequestPermits();
+        for (PulsarClientImpl client : clients) {
+            // Verify: the behavior of topic creation.
+            final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() 
+ "://" + DEFAULT_NS + "/tp");
+            client.getPartitionedTopicMetadata(tp, false, true).join();
+            Optional<PartitionedTopicMetadata> metadata1 = 
pulsar1.getPulsarResources().getNamespaceResources()
+                    .getPartitionedTopicResources()
+                    .getPartitionedTopicMetadataAsync(TopicName.get(tp), 
true).join();
+            assertTrue(metadata1.isPresent());
+            assertEquals(metadata1.get().partitions, 3);
+
+            // Verify: lookup semaphore has been releases.
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+            });
+
+            // Cleanup.
+            admin1.topics().deletePartitionedTopic(tp, false);
+        }
+
+        // reset clients.
+        for (PulsarClientImpl client : clients) {
+            ConnectionPool pool = client.getCnxPool();
+            for (CompletableFuture<ClientCnx> connectionFuture : 
pool.getConnections()) {
+                ClientCnx clientCnx = connectionFuture.join();
+                
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+                field.set(clientCnx, true);
+            }
+        }
+    }
+
     @DataProvider(name = "autoCreationParamsAll")
     public Object[][] autoCreationParamsAll(){
         return new Object[][]{
@@ -265,7 +323,7 @@ public class GetPartitionMetadataTest {
         for (PulsarClientImpl client : clientArray) {
             // Verify: the result of get partitioned topic metadata.
             PartitionedTopicMetadata response =
-                    client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled).join();
+                    client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled, false).join();
             assertEquals(response.partitions, 0);
             List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
             assertFalse(partitionedTopics.contains(topicNameStr));
@@ -298,7 +356,7 @@ public class GetPartitionMetadataTest {
         for (PulsarClientImpl client : clientArray) {
             // Verify: the result of get partitioned topic metadata.
             PartitionedTopicMetadata response =
-                    client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled).join();
+                    client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled, false).join();
             assertEquals(response.partitions, 3);
             verifyNonPartitionedTopicNeverCreated(topicNameStr);
 
@@ -332,7 +390,7 @@ public class GetPartitionMetadataTest {
             // Case-1: normal topic.
             final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
             // Verify: the result of get partitioned topic metadata.
-            PartitionedTopicMetadata response = 
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+            PartitionedTopicMetadata response = 
client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
             assertEquals(response.partitions, 3);
             // Verify: the behavior of topic creation.
             List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
@@ -347,7 +405,7 @@ public class GetPartitionMetadataTest {
                     topicDomain.value() + "://" + DEFAULT_NS + "/tp") + 
"-partition-1";
             // Verify: the result of get partitioned topic metadata.
             PartitionedTopicMetadata response2 =
-                    client.getPartitionedTopicMetadata(topicNameStrWithSuffix, 
true).join();
+                    client.getPartitionedTopicMetadata(topicNameStrWithSuffix, 
true, false).join();
             assertEquals(response2.partitions, 0);
             // Verify: the behavior of topic creation.
             List<String> partitionedTopics2 =
@@ -380,7 +438,7 @@ public class GetPartitionMetadataTest {
             // Case 1: normal topic.
             final String topicNameStr = 
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
             // Verify: the result of get partitioned topic metadata.
-            PartitionedTopicMetadata response = 
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+            PartitionedTopicMetadata response = 
client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
             assertEquals(response.partitions, 0);
             // Verify: the behavior of topic creation.
             List<String> partitionedTopics = 
admin1.topics().getPartitionedTopicList("public/default");
@@ -392,7 +450,7 @@ public class GetPartitionMetadataTest {
                     topicDomain.value() + "://" + DEFAULT_NS + "/tp") + 
"-partition-1";
             // Verify: the result of get partitioned topic metadata.
             PartitionedTopicMetadata response2 =
-                    client.getPartitionedTopicMetadata(topicNameStrWithSuffix, 
true).join();
+                    client.getPartitionedTopicMetadata(topicNameStrWithSuffix, 
true, false).join();
             assertEquals(response2.partitions, 0);
             // Verify: the behavior of topic creation.
             List<String> partitionedTopics2 =
@@ -443,7 +501,7 @@ public class GetPartitionMetadataTest {
             final TopicName topicName = TopicName.get(topicNameStr);
             // Verify: the result of get partitioned topic metadata.
             try {
-                client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled)
+                client.getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled, false)
                         .join();
                 fail("Expect a not found exception");
             } catch (Exception e) {
@@ -496,7 +554,7 @@ public class GetPartitionMetadataTest {
             // Verify: the result of get partitioned topic metadata.
             try {
                 PartitionedTopicMetadata topicMetadata = client
-                        .getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled)
+                        .getPartitionedTopicMetadata(topicNameStr, 
paramMetadataAutoCreationEnabled, false)
                         .join();
                 log.info("Get topic metadata: {}", topicMetadata.partitions);
                 fail("Expected a not found ex");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 4712682e71b..45c7dbea2ab 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -136,7 +136,9 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
             ((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
             when(mockLookup.getPartitionedTopicMetadata(any(), 
anyBoolean())).thenAnswer(
                     i -> CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0)));
-            when(mockLookup.getBroker(any())).thenAnswer(i -> {
+            when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), 
anyBoolean())).thenAnswer(
+                    i -> CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0)));
+            when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
                 InetSocketAddress brokerAddress =
                         new InetSocketAddress(pulsar.getAdvertisedAddress(), 
pulsar.getBrokerListenPort().get());
                 return 
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 3f268c4b7c9..bd2121948dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -146,7 +146,8 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()
-                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
false).get();
+                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
false)
+                        .get();
         Transaction lowWaterMarkTxn = null;
         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
             lowWaterMarkTxn = pulsarClient.newTransaction()
@@ -251,7 +252,8 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()
-                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
false).get();
+                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
false)
+                        .get();
         Transaction lowWaterMarkTxn = null;
         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
             lowWaterMarkTxn = pulsarClient.newTransaction()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 1a9b4bbcb0d..bdff97dbb3b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -159,7 +159,7 @@ public class ClientCnxTest extends 
MockedPulsarServiceBaseTest {
             clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
             Assert.fail("Expected an error that the broker version is too 
old.");
         } catch (Exception ex) {
-            Assert.assertTrue(ex.getMessage().contains("without auto-creation 
is not supported from the broker"));
+            Assert.assertTrue(ex.getMessage().contains("without auto-creation 
is not supported by the broker"));
         }
 
         // cleanup.
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 22a97571e53..0e12b4f8e32 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import lombok.Getter;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -737,6 +738,30 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    /**
+     * Not supported exception thrown by Pulsar client.
+     */
+    public static class FeatureNotSupportedException extends 
NotSupportedException {
+
+        @Getter
+        private final FailedFeatureCheck failedFeatureCheck;
+
+        public FeatureNotSupportedException(String msg, FailedFeatureCheck 
failedFeatureCheck) {
+            super(msg);
+            this.failedFeatureCheck = failedFeatureCheck;
+        }
+    }
+
+    /**
+     * "supports_auth_refresh" was introduced at "2.6" and is no longer 
supported, so skip this enum.
+     * "supports_broker_entry_metadata" was introduced at "2.8" and is no 
longer supported, so skip this enum.
+     * "supports_partial_producer" was introduced at "2.10" and is no longer 
supported, so skip this enum.
+     * "supports_topic_watchers" was introduced at "2.11" and is no longer 
supported, so skip this enum.
+     */
+    public enum FailedFeatureCheck {
+        SupportsGetPartitionedMetadataWithoutAutoCreation;
+    }
+
     /**
      * Not allowed exception thrown by Pulsar client.
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 980e8a0c786..fb6b84b1096 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static java.lang.String.format;
+import static 
org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
 import io.netty.buffer.ByteBuf;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -123,12 +124,13 @@ public class BinaryProtoLookupService implements 
LookupService {
      */
     @Override
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
-            TopicName topicName, boolean metadataAutoCreationEnabled) {
+            TopicName topicName, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
         try {
             return partitionedMetadataInProgress.computeIfAbsent(topicName, 
tpName -> {
                 CompletableFuture<PartitionedTopicMetadata> newFuture = 
getPartitionedTopicMetadata(
-                        serviceNameResolver.resolveHost(), topicName, 
metadataAutoCreationEnabled);
+                        serviceNameResolver.resolveHost(), topicName, 
metadataAutoCreationEnabled,
+                        useFallbackForNonPIP344Brokers);
                 newFutureCreated.setValue(newFuture);
                 return newFuture;
             });
@@ -224,20 +226,31 @@ public class BinaryProtoLookupService implements 
LookupService {
     }
 
     private CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(InetSocketAddress socketAddress,
-            TopicName topicName, boolean metadataAutoCreationEnabled) {
+            TopicName topicName, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
 
         CompletableFuture<PartitionedTopicMetadata> partitionFuture = new 
CompletableFuture<>();
 
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+            boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
             if (!metadataAutoCreationEnabled && 
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
-                partitionFuture.completeExceptionally(new 
PulsarClientException.NotSupportedException("The feature of"
-                        + " getting partitions without auto-creation is not 
supported from the broker,"
-                        + " please upgrade the broker to the latest 
version."));
-                return;
+                if (useFallbackForNonPIP344Brokers) {
+                    log.info("[{}] Using original behavior of 
getPartitionedTopicMetadata(topic) in "
+                            + "getPartitionedTopicMetadata(topic, false) "
+                            + "since the target broker does not support 
PIP-344 and fallback is enabled.", topicName);
+                    finalAutoCreationEnabled = true;
+                } else {
+                    partitionFuture.completeExceptionally(
+                            new 
PulsarClientException.FeatureNotSupportedException("The feature of "
+                                    + "getting partitions without 
auto-creation is not supported by the broker. "
+                                    + "Please upgrade the broker to version 
that supports PIP-344 to resolve this "
+                                    + "issue.",
+                                    
SupportsGetPartitionedMetadataWithoutAutoCreation));
+                    return;
+                }
             }
             long requestId = client.newRequestId();
             ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
-                    metadataAutoCreationEnabled);
+                    finalAutoCreationEnabled);
             clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
                 if (t != null) {
                     log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 4d6cf96a010..2d31dc427d0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -106,7 +106,7 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
 
     private CompletableFuture<Boolean> checkDlqAlreadyExists(String topic) {
         CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
-        client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata 
-> {
+        client.getPartitionedTopicMetadata(topic, false, 
true).thenAccept(metadata -> {
             TopicName topicName = TopicName.get(topic);
             if (topicName.isPersistent()) {
                 // Either partitioned or non-partitioned, it exists.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index ba04aaa3b31..38fdeff24a9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -107,9 +107,13 @@ public class HttpLookupService implements LookupService {
         });
     }
 
+    /**
+     * {@inheritDoc}
+     * @param useFallbackForNonPIP344Brokers HttpLookupService ignores this 
parameter
+     */
     @Override
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
-            TopicName topicName, boolean metadataAutoCreationEnabled) {
+            TopicName topicName, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
         String format = topicName.isV2() ? "admin/v2/%s/partitions" : 
"admin/%s/partitions";
         return httpClient.get(String.format(format, topicName.getLookupName()) 
+ "?checkAllowAutoCreation="
                 + metadataAutoCreationEnabled, PartitionedTopicMetadata.class);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 675781b122f..9668cf2373b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -61,11 +61,19 @@ public interface LookupService extends AutoCloseable {
     /**
      * Returns {@link PartitionedTopicMetadata} for a given topic.
      * Note: this method will try to create the topic partitioned metadata if 
it does not exist.
-     * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, 
boolean)}}.
+     * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, 
boolean, boolean)}}.
      */
     @Deprecated
     default CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
-        return getPartitionedTopicMetadata(topicName, true);
+        return getPartitionedTopicMetadata(topicName, true, true);
+    }
+
+    /**
+     * See the doc {@link #getPartitionedTopicMetadata(TopicName, boolean, 
boolean)}.
+     */
+    default CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName,
+                                                                               
 boolean metadataAutoCreationEnabled) {
+        return getPartitionedTopicMetadata(topicName, 
metadataAutoCreationEnabled, false);
     }
 
     /**
@@ -80,10 +88,15 @@ public interface LookupService extends AutoCloseable {
      * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger 
an auto-creation for this topic(using
      *  the default topic auto-creation strategy you set for the broker), and 
the corresponding result is returned.
      *  For the result, see case 1.
+     * @param useFallbackForNonPIP344Brokers <p>If true, fallback to the prior 
behavior of the method
+     *   {@link #getPartitionedTopicMetadata(TopicName)} if the broker does 
not support the PIP-344 feature
+     *   'supports_get_partitioned_metadata_without_auto_creation'. This 
parameter only affects the behavior when
+     *   {@param metadataAutoCreationEnabled} is false.</p>
      * @version 3.3.0.
      */
     CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName,
-                                                                            
boolean metadataAutoCreationEnabled);
+                                                                        
boolean metadataAutoCreationEnabled,
+                                                                        
boolean useFallbackForNonPIP344Brokers);
 
     /**
      * Returns current SchemaInfo {@link SchemaInfo} for a given topic.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5d1e91a2852..418dd8561db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -954,7 +954,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
 
-        client.getPartitionedTopicMetadata(topicName, true)
+        client.getPartitionedTopicMetadata(topicName, true, false)
                 .thenAccept(metadata -> 
subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
                     createTopicIfDoesNotExist))
                 .exceptionally(ex1 -> {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3ac9c5727ca..f28b81e8e55 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -379,7 +379,7 @@ public class PulsarClientImpl implements PulsarClient {
     private CompletableFuture<Integer> checkPartitions(String topic, boolean 
forceNoPartitioned,
                                                        @Nullable String 
producerNameForLog) {
         CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();
-        getPartitionedTopicMetadata(topic, 
!forceNoPartitioned).thenAccept(metadata -> {
+        getPartitionedTopicMetadata(topic, !forceNoPartitioned, 
true).thenAccept(metadata -> {
             if (forceNoPartitioned && metadata.partitions > 0) {
                 String errorMsg = String.format("Can not create the 
producer[%s] for the topic[%s] that contains %s"
                                 + " partitions, but the producer does not 
support for a partitioned topic.",
@@ -552,7 +552,7 @@ public class PulsarClientImpl implements PulsarClient {
 
         String topic = conf.getSingleTopic();
 
-        getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
+        getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata -> 
{
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
@@ -701,7 +701,7 @@ public class PulsarClientImpl implements PulsarClient {
 
         CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
 
-        getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
+        getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata -> 
{
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
@@ -1112,8 +1112,15 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
+    /**
+     * @param useFallbackForNonPIP344Brokers <p>If true, fallback to the prior 
behavior of the method
+     *                                       getPartitionedTopicMetadata if 
the broker does not support the PIP-344
+     *                                       feature 
'supports_get_partitioned_metadata_without_auto_creation'. This
+     *                                       parameter only affects the 
behavior when
+     *                                       {@param 
metadataAutoCreationEnabled} is false.</p>
+     */
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
-            String topic, boolean metadataAutoCreationEnabled) {
+            String topic, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
 
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
 
@@ -1125,8 +1132,8 @@ public class PulsarClientImpl implements PulsarClient {
                     .setMandatoryStop(opTimeoutMs.get() * 2, 
TimeUnit.MILLISECONDS)
                     .setMax(conf.getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
                     .create();
-            getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
-                                        metadataFuture, new ArrayList<>(), 
metadataAutoCreationEnabled);
+            getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, 
metadataFuture, new ArrayList<>(),
+                    metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
         } catch (IllegalArgumentException e) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(e.getMessage()));
         }
@@ -1138,10 +1145,11 @@ public class PulsarClientImpl implements PulsarClient {
                                              AtomicLong remainingTime,
                                              
CompletableFuture<PartitionedTopicMetadata> future,
                                              List<Throwable> 
previousExceptions,
-                                             boolean 
metadataAutoCreationEnabled) {
+                                             boolean 
metadataAutoCreationEnabled,
+                                             boolean 
useFallbackForNonPIP344Brokers) {
         long startTime = System.nanoTime();
-        CompletableFuture<PartitionedTopicMetadata> queryFuture =
-                lookup.getPartitionedTopicMetadata(topicName, 
metadataAutoCreationEnabled);
+        CompletableFuture<PartitionedTopicMetadata> queryFuture = 
lookup.getPartitionedTopicMetadata(topicName,
+                metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
         queryFuture.thenAccept(future::complete).exceptionally(e -> {
             remainingTime.addAndGet(-1 * 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
@@ -1162,7 +1170,7 @@ public class PulsarClientImpl implements PulsarClient {
                         + "Will try again in {} ms", topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
                 getPartitionedTopicMetadata(topicName, backoff, remainingTime, 
future, previousExceptions,
-                        metadataAutoCreationEnabled);
+                        metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
             }, nextDelay, TimeUnit.MILLISECONDS);
             return null;
         });
@@ -1170,7 +1178,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     @Override
     public CompletableFuture<List<String>> getPartitionsForTopic(String topic, 
boolean metadataAutoCreationEnabled) {
-        return getPartitionedTopicMetadata(topic, 
metadataAutoCreationEnabled).thenApply(metadata -> {
+        return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, 
false).thenApply(metadata -> {
             if (metadata.partitions > 0) {
                 TopicName topicName = TopicName.get(topic);
                 List<String> partitions = new ArrayList<>(metadata.partitions);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 45a3ad4f978..ce19cbf873e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -80,7 +80,7 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
             return pulsarClient.getPartitionedTopicMetadata(
-                            
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true)
+                            
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), 
true, false)
                 .thenCompose(partitionMeta -> {
                     List<CompletableFuture<Void>> connectFutureList = new 
ArrayList<>();
                     if (LOG.isDebugEnabled()) {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 191124bb7b0..02a4d2ebba8 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -154,7 +154,7 @@ public class MultiTopicsConsumerImplTest {
         int completionDelayMillis = 100;
         Schema<byte[]> schema = Schema.BYTES;
         PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
-        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), 
anyBoolean()))
                 .thenAnswer(invocation -> createDelayedCompletedFuture(
                 new PartitionedTopicMetadata(), completionDelayMillis));
         MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(
@@ -203,7 +203,7 @@ public class MultiTopicsConsumerImplTest {
         int completionDelayMillis = 10;
         Schema<byte[]> schema = Schema.BYTES;
         PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
-        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), 
anyBoolean()))
                 .thenAnswer(invocation -> createExceptionFuture(
                 new PulsarClientException.InvalidConfigurationException("a 
mock exception"), completionDelayMillis));
         CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
@@ -240,7 +240,7 @@ public class MultiTopicsConsumerImplTest {
 
         // Simulate non partitioned topics
         PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);
-        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+        when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), 
anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(metadata));
         CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
 
@@ -252,7 +252,7 @@ public class MultiTopicsConsumerImplTest {
 
         // getPartitionedTopicMetadata should have been called only the first 
time, for each of the 3 topics,
         // but not anymore since the topics are not partitioned.
-        verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), 
anyBoolean());
+        verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), 
anyBoolean(), anyBoolean());
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index e13c060a052..8bf0c037e46 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -107,7 +107,7 @@ public class PulsarClientImplTest {
                 nullable(String.class)))
                 .thenReturn(CompletableFuture.completedFuture(
                         new GetTopicsResult(Collections.emptyList(), null, 
false, true)));
-        when(lookup.getPartitionedTopicMetadata(any(TopicName.class), 
anyBoolean()))
+        when(lookup.getPartitionedTopicMetadata(any(TopicName.class), 
anyBoolean(), anyBoolean()))
                 .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata()));
         when(lookup.getBroker(any()))
                 .thenReturn(CompletableFuture.completedFuture(
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index f56df6ae9d1..fa2181b484f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -295,6 +295,7 @@ message CommandConnect {
     optional string proxy_version = 11; // Version of the proxy. Should only 
be forwarded by a proxy.
 }
 
+// Please also add a new enum for the class 
"PulsarClientException.FailedFeatureCheck" when adding a new feature flag.
 message FeatureFlags {
   optional bool supports_auth_refresh = 1 [default = false];
   optional bool supports_broker_entry_metadata = 2 [default = false];


Reply via email to