This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 46e2fa37a43 [fix] [broker] Fix compatibility issues for PIP-344
(#23136)
46e2fa37a43 is described below
commit 46e2fa37a438e88332206bd8e46719335f68ddcf
Author: fengyubiao <[email protected]>
AuthorDate: Sat Aug 10 15:55:55 2024 +0800
[fix] [broker] Fix compatibility issues for PIP-344 (#23136)
Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 702c0b3cfa251636099871f65f0ba1ac1a52069c)
---
.../pulsar/broker/namespace/NamespaceService.java | 17 ++++-
.../admin/GetPartitionMetadataMultiBrokerTest.java | 88 ++++++++++++++++++++++
.../broker/admin/GetPartitionMetadataTest.java | 74 ++++++++++++++++--
.../pulsar/broker/admin/TopicAutoCreationTest.java | 2 +
.../buffer/TransactionLowWaterMarkTest.java | 6 +-
.../apache/pulsar/client/impl/ClientCnxTest.java | 2 +-
.../pulsar/client/api/PulsarClientException.java | 25 ++++++
.../client/impl/BinaryProtoLookupService.java | 29 +++++--
.../pulsar/client/impl/ConsumerBuilderImpl.java | 2 +-
.../pulsar/client/impl/HttpLookupService.java | 6 +-
.../apache/pulsar/client/impl/LookupService.java | 19 ++++-
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
.../pulsar/client/impl/PulsarClientImpl.java | 30 +++++---
.../TransactionCoordinatorClientImpl.java | 2 +-
.../client/impl/MultiTopicsConsumerImplTest.java | 8 +-
.../pulsar/client/impl/PulsarClientImplTest.java | 2 +-
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
17 files changed, 272 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 800a81a0f70..f3fb17c02fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -23,6 +23,7 @@ import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
@@ -1498,8 +1499,22 @@ public class NamespaceService implements AutoCloseable {
|| actEx instanceof
PulsarClientException.TopicDoesNotExistException
|| actEx instanceof
PulsarAdminException.NotFoundException) {
return
CompletableFuture.completedFuture(false);
+ } else if (actEx instanceof
PulsarClientException.FeatureNotSupportedException fe){
+ if (fe.getFailedFeatureCheck() ==
SupportsGetPartitionedMetadataWithoutAutoCreation) {
+ // Since the feature PIP-344 isn't
supported, restore the behavior to previous
+ // behavior before
https://github.com/apache/pulsar/pull/22838 changes.
+ log.info("{} Checking the existence of a
non-persistent non-partitioned topic "
+ + "was performed using the
behavior prior to PIP-344 changes, "
+ + "because the broker does
not support the PIP-344 feature "
+ +
"'supports_get_partitioned_metadata_without_auto_creation'.",
+ topic);
+ return
CompletableFuture.completedFuture(false);
+ } else {
+ log.error("{} Failed to get partition
metadata", topic, ex);
+ return CompletableFuture.failedFuture(ex);
+ }
} else {
- log.error("{} Failed to get partition metadata
due to redirecting fails", topic, ex);
+ log.error("{} Failed to get partition
metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
index 28cf91ee165..60691203e77 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
@@ -18,20 +18,32 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
import java.net.URL;
+import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
@@ -219,4 +231,80 @@ public class GetPartitionMetadataMultiBrokerTest extends
GetPartitionMetadataTes
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
}
+
+ @DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
+ public Object[][] autoCreationParamsAllForNonPersistentTopic(){
+ return new Object[][]{
+ // configAllowAutoTopicCreation,
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
+ {true, true, true},
+ {true, true, false},
+ {true, false, true},
+ {true, false, false},
+ {false, true, true},
+ {false, true, false},
+ {false, false, true},
+ {false, false, false}
+ };
+ }
+
+ @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic",
priority = Integer.MAX_VALUE)
+ public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean
configAllowAutoTopicCreation,
+ boolean
paramMetadataAutoCreationEnabled,
+ boolean isUsingHttpLookup)
throws Exception {
+ modifyTopicAutoCreation(configAllowAutoTopicCreation,
TopicType.PARTITIONED, 3);
+
+ // Initialize the connections of internal Pulsar Client.
+ PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
+ PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
+
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS
+ "/tp1"));
+
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS
+ "/tp1"));
+
+ // Inject a not support flag into the connections initialized.
+ Field field =
ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
+ field.setAccessible(true);
+ for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
+ ConnectionPool pool = client.getCnxPool();
+ for (CompletableFuture<ClientCnx> connectionFuture :
pool.getConnections()) {
+ ClientCnx clientCnx = connectionFuture.join();
+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+ field.set(clientCnx, false);
+ }
+ }
+ // Verify: the method "getPartitionsForTopic(topic, false, true)" will
fallback
+ // to "getPartitionsForTopic(topic, true)" behavior.
+ int lookupPermitsBefore = getLookupRequestPermits();
+
+ // Verify: we will not get an un-support error.
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ final String topicNameStr =
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
+ try {
+ PartitionedTopicMetadata topicMetadata = client
+ .getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled, false)
+ .join();
+ log.info("Get topic metadata: {}", topicMetadata.partitions);
+ } catch (Exception ex) {
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ assertTrue(unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || unwrapEx instanceof
PulsarClientException.NotFoundException);
+ assertFalse(ex.getMessage().contains("getting partitions
without auto-creation is not supported from"
+ + " the broker"));
+ }
+ }
+
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+
+ // reset clients.
+ for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
+ ConnectionPool pool = client.getCnxPool();
+ for (CompletableFuture<ClientCnx> connectionFuture :
pool.getConnections()) {
+ ClientCnx clientCnx = connectionFuture.join();
+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+ field.set(clientCnx, true);
+ }
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index bf99b172829..87bc4267b48 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -23,10 +23,12 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
@@ -34,6 +36,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -225,6 +229,60 @@ public class GetPartitionMetadataTest {
}
}
+ @Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE)
+ public void testCompatibilityForNewClientAndOldBroker(TopicDomain
topicDomain) throws Exception {
+ modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
+ // Initialize connections.
+ String pulsarUrl = pulsar1.getBrokerServiceUrl();
+ PulsarClientImpl[] clients = getClientsToTest(false);
+ for (PulsarClientImpl client : clients) {
+ client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS +
"/tp1"));
+ }
+ // Inject a not support flag into the connections initialized.
+ Field field =
ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
+ field.setAccessible(true);
+ for (PulsarClientImpl client : clients) {
+ ConnectionPool pool = client.getCnxPool();
+ for (CompletableFuture<ClientCnx> connectionFuture :
pool.getConnections()) {
+ ClientCnx clientCnx = connectionFuture.join();
+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+ field.set(clientCnx, false);
+ }
+ }
+
+ // Verify: the method "getPartitionsForTopic(topic, false, true)" will
fallback to
+ // "getPartitionsForTopic(topic)" behavior.
+ int lookupPermitsBefore = getLookupRequestPermits();
+ for (PulsarClientImpl client : clients) {
+ // Verify: the behavior of topic creation.
+ final String tp = BrokerTestUtil.newUniqueName(topicDomain.value()
+ "://" + DEFAULT_NS + "/tp");
+ client.getPartitionedTopicMetadata(tp, false, true).join();
+ Optional<PartitionedTopicMetadata> metadata1 =
pulsar1.getPulsarResources().getNamespaceResources()
+ .getPartitionedTopicResources()
+ .getPartitionedTopicMetadataAsync(TopicName.get(tp),
true).join();
+ assertTrue(metadata1.isPresent());
+ assertEquals(metadata1.get().partitions, 3);
+
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+
+ // Cleanup.
+ admin1.topics().deletePartitionedTopic(tp, false);
+ }
+
+ // reset clients.
+ for (PulsarClientImpl client : clients) {
+ ConnectionPool pool = client.getCnxPool();
+ for (CompletableFuture<ClientCnx> connectionFuture :
pool.getConnections()) {
+ ClientCnx clientCnx = connectionFuture.join();
+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
+ field.set(clientCnx, true);
+ }
+ }
+ }
+
@DataProvider(name = "autoCreationParamsAll")
public Object[][] autoCreationParamsAll(){
return new Object[][]{
@@ -265,7 +323,7 @@ public class GetPartitionMetadataTest {
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
- client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled).join();
+ client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 0);
List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
assertFalse(partitionedTopics.contains(topicNameStr));
@@ -298,7 +356,7 @@ public class GetPartitionMetadataTest {
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
- client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled).join();
+ client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 3);
verifyNonPartitionedTopicNeverCreated(topicNameStr);
@@ -332,7 +390,7 @@ public class GetPartitionMetadataTest {
// Case-1: normal topic.
final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
- PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+ PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 3);
// Verify: the behavior of topic creation.
List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
@@ -347,7 +405,7 @@ public class GetPartitionMetadataTest {
topicDomain.value() + "://" + DEFAULT_NS + "/tp") +
"-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
- client.getPartitionedTopicMetadata(topicNameStrWithSuffix,
true).join();
+ client.getPartitionedTopicMetadata(topicNameStrWithSuffix,
true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
@@ -380,7 +438,7 @@ public class GetPartitionMetadataTest {
// Case 1: normal topic.
final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
- PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+ PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
@@ -392,7 +450,7 @@ public class GetPartitionMetadataTest {
topicDomain.value() + "://" + DEFAULT_NS + "/tp") +
"-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
- client.getPartitionedTopicMetadata(topicNameStrWithSuffix,
true).join();
+ client.getPartitionedTopicMetadata(topicNameStrWithSuffix,
true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
@@ -443,7 +501,7 @@ public class GetPartitionMetadataTest {
final TopicName topicName = TopicName.get(topicNameStr);
// Verify: the result of get partitioned topic metadata.
try {
- client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled)
+ client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled, false)
.join();
fail("Expect a not found exception");
} catch (Exception e) {
@@ -496,7 +554,7 @@ public class GetPartitionMetadataTest {
// Verify: the result of get partitioned topic metadata.
try {
PartitionedTopicMetadata topicMetadata = client
- .getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled)
+ .getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 55601ad4c6b..c90ad15242c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -136,6 +136,8 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any(),
anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0)));
+ when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(),
anyBoolean())).thenAnswer(
+ i -> CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(),
pulsar.getBrokerListenPort().get());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 6e121aca381..cb19b6b3b1b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -148,7 +148,8 @@ public class TransactionLowWaterMarkTest extends
TransactionTestBase {
PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
-
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false).get();
+
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false)
+ .get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
@@ -253,7 +254,8 @@ public class TransactionLowWaterMarkTest extends
TransactionTestBase {
PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
-
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false).get();
+
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false)
+ .get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index df6b1b8a8f9..5fc19f04a39 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -160,7 +160,7 @@ public class ClientCnxTest extends
MockedPulsarServiceBaseTest {
clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
Assert.fail("Expected an error that the broker version is too
old.");
} catch (Exception ex) {
- Assert.assertTrue(ex.getMessage().contains("without auto-creation
is not supported from the broker"));
+ Assert.assertTrue(ex.getMessage().contains("without auto-creation
is not supported by the broker"));
}
// cleanup.
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index c460fee11d0..513ee4d7e4e 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import lombok.Getter;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -741,6 +742,30 @@ public class PulsarClientException extends IOException {
}
}
+ /**
+ * Not supported exception thrown by Pulsar client.
+ */
+ public static class FeatureNotSupportedException extends
NotSupportedException {
+
+ @Getter
+ private final FailedFeatureCheck failedFeatureCheck;
+
+ public FeatureNotSupportedException(String msg, FailedFeatureCheck
failedFeatureCheck) {
+ super(msg);
+ this.failedFeatureCheck = failedFeatureCheck;
+ }
+ }
+
+ /**
+ * "supports_auth_refresh" was introduced at "2.6" and is no longer
supported, so skip this enum.
+ * "supports_broker_entry_metadata" was introduced at "2.8" and is no
longer supported, so skip this enum.
+ * "supports_partial_producer" was introduced at "2.10" and is no longer
supported, so skip this enum.
+ * "supports_topic_watchers" was introduced at "2.11" and is no longer
supported, so skip this enum.
+ */
+ public enum FailedFeatureCheck {
+ SupportsGetPartitionedMetadataWithoutAutoCreation;
+ }
+
/**
* Not allowed exception thrown by Pulsar client.
*/
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index bf015c564b9..6ee6fafde1c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static java.lang.String.format;
+import static
org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
@@ -146,12 +147,13 @@ public class BinaryProtoLookupService implements
LookupService {
*/
@Override
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(
- TopicName topicName, boolean metadataAutoCreationEnabled) {
+ TopicName topicName, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName,
tpName -> {
CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(
- serviceNameResolver.resolveHost(), topicName,
metadataAutoCreationEnabled);
+ serviceNameResolver.resolveHost(), topicName,
metadataAutoCreationEnabled,
+ useFallbackForNonPIP344Brokers);
newFutureCreated.setValue(newFuture);
return newFuture;
});
@@ -248,21 +250,32 @@ public class BinaryProtoLookupService implements
LookupService {
}
private CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(InetSocketAddress socketAddress,
- TopicName topicName, boolean metadataAutoCreationEnabled) {
+ TopicName topicName, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new
CompletableFuture<>();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
+ boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled &&
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
- partitionFuture.completeExceptionally(new
PulsarClientException.NotSupportedException("The feature of"
- + " getting partitions without auto-creation is not
supported from the broker,"
- + " please upgrade the broker to the latest
version."));
- return;
+ if (useFallbackForNonPIP344Brokers) {
+ log.info("[{}] Using original behavior of
getPartitionedTopicMetadata(topic) in "
+ + "getPartitionedTopicMetadata(topic, false) "
+ + "since the target broker does not support
PIP-344 and fallback is enabled.", topicName);
+ finalAutoCreationEnabled = true;
+ } else {
+ partitionFuture.completeExceptionally(
+ new
PulsarClientException.FeatureNotSupportedException("The feature of "
+ + "getting partitions without
auto-creation is not supported by the broker. "
+ + "Please upgrade the broker to version
that supports PIP-344 to resolve this "
+ + "issue.",
+
SupportsGetPartitionedMetadataWithoutAutoCreation));
+ return;
+ }
}
long requestId = client.newRequestId();
ByteBuf request =
Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
- metadataAutoCreationEnabled);
+ finalAutoCreationEnabled);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
histoGetTopicMetadata.recordFailure(System.nanoTime() -
startTime);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 4d6cf96a010..2d31dc427d0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -106,7 +106,7 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
private CompletableFuture<Boolean> checkDlqAlreadyExists(String topic) {
CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
- client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata
-> {
+ client.getPartitionedTopicMetadata(topic, false,
true).thenAccept(metadata -> {
TopicName topicName = TopicName.get(topic);
if (topicName.isPersistent()) {
// Either partitioned or non-partitioned, it exists.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 44ef4ac17ee..4a5557fa869 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -135,9 +135,13 @@ public class HttpLookupService implements LookupService {
});
}
+ /**
+ * {@inheritDoc}
+ * @param useFallbackForNonPIP344Brokers HttpLookupService ignores this
parameter
+ */
@Override
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(
- TopicName topicName, boolean metadataAutoCreationEnabled) {
+ TopicName topicName, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
long startTime = System.nanoTime();
String format = topicName.isV2() ? "admin/v2/%s/partitions" :
"admin/%s/partitions";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 2fe457059c1..3367ae99cb1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -61,11 +61,19 @@ public interface LookupService extends AutoCloseable {
/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
* Note: this method will try to create the topic partitioned metadata if
it does not exist.
- * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName,
boolean)}}.
+ * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName,
boolean, boolean)}}.
*/
@Deprecated
default CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(TopicName topicName) {
- return getPartitionedTopicMetadata(topicName, true);
+ return getPartitionedTopicMetadata(topicName, true, true);
+ }
+
+ /**
+ * See the doc {@link #getPartitionedTopicMetadata(TopicName, boolean,
boolean)}.
+ */
+ default CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(TopicName topicName,
+
boolean metadataAutoCreationEnabled) {
+ return getPartitionedTopicMetadata(topicName,
metadataAutoCreationEnabled, false);
}
/**
@@ -80,10 +88,15 @@ public interface LookupService extends AutoCloseable {
* 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger
an auto-creation for this topic(using
* the default topic auto-creation strategy you set for the broker), and
the corresponding result is returned.
* For the result, see case 1.
+ * @param useFallbackForNonPIP344Brokers <p>If true, fallback to the prior
behavior of the method
+ * {@link #getPartitionedTopicMetadata(TopicName)} if the broker does
not support the PIP-344 feature
+ * 'supports_get_partitioned_metadata_without_auto_creation'. This
parameter only affects the behavior when
+ * {@param metadataAutoCreationEnabled} is false.</p>
* @version 3.3.0.
*/
CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(TopicName topicName,
-
boolean metadataAutoCreationEnabled);
+
boolean metadataAutoCreationEnabled,
+
boolean useFallbackForNonPIP344Brokers);
/**
* Returns current SchemaInfo {@link SchemaInfo} for a given topic.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e8cbf71e500..3f5e501b281 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -957,7 +957,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
- client.getPartitionedTopicMetadata(topicName, true)
+ client.getPartitionedTopicMetadata(topicName, true, false)
.thenAccept(metadata ->
subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
createTopicIfDoesNotExist))
.exceptionally(ex1 -> {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 15bf36e0269..f6fb763df1d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -386,7 +386,7 @@ public class PulsarClientImpl implements PulsarClient {
private CompletableFuture<Integer> checkPartitions(String topic, boolean
forceNoPartitioned,
@Nullable String
producerNameForLog) {
CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();
- getPartitionedTopicMetadata(topic,
!forceNoPartitioned).thenAccept(metadata -> {
+ getPartitionedTopicMetadata(topic, !forceNoPartitioned,
true).thenAccept(metadata -> {
if (forceNoPartitioned && metadata.partitions > 0) {
String errorMsg = String.format("Can not create the
producer[%s] for the topic[%s] that contains %s"
+ " partitions, but the producer does not
support for a partitioned topic.",
@@ -559,7 +559,7 @@ public class PulsarClientImpl implements PulsarClient {
String topic = conf.getSingleTopic();
- getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
+ getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata ->
{
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}",
topic, metadata.partitions);
}
@@ -708,7 +708,7 @@ public class PulsarClientImpl implements PulsarClient {
CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
- getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
+ getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata ->
{
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}",
topic, metadata.partitions);
}
@@ -1129,8 +1129,15 @@ public class PulsarClientImpl implements PulsarClient {
}
}
+ /**
+ * @param useFallbackForNonPIP344Brokers <p>If true, fallback to the prior
behavior of the method
+ * getPartitionedTopicMetadata if
the broker does not support the PIP-344
+ * feature
'supports_get_partitioned_metadata_without_auto_creation'. This
+ * parameter only affects the
behavior when
+ * {@param
metadataAutoCreationEnabled} is false.</p>
+ */
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(
- String topic, boolean metadataAutoCreationEnabled) {
+ String topic, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new
CompletableFuture<>();
@@ -1142,8 +1149,8 @@ public class PulsarClientImpl implements PulsarClient {
.setMandatoryStop(opTimeoutMs.get() * 2,
TimeUnit.MILLISECONDS)
.setMax(conf.getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.create();
- getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
- metadataFuture, new ArrayList<>(),
metadataAutoCreationEnabled);
+ getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
metadataFuture, new ArrayList<>(),
+ metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new
PulsarClientException.InvalidConfigurationException(e.getMessage()));
}
@@ -1155,10 +1162,11 @@ public class PulsarClientImpl implements PulsarClient {
AtomicLong remainingTime,
CompletableFuture<PartitionedTopicMetadata> future,
List<Throwable>
previousExceptions,
- boolean
metadataAutoCreationEnabled) {
+ boolean
metadataAutoCreationEnabled,
+ boolean
useFallbackForNonPIP344Brokers) {
long startTime = System.nanoTime();
- CompletableFuture<PartitionedTopicMetadata> queryFuture =
- lookup.getPartitionedTopicMetadata(topicName,
metadataAutoCreationEnabled);
+ CompletableFuture<PartitionedTopicMetadata> queryFuture =
lookup.getPartitionedTopicMetadata(topicName,
+ metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
queryFuture.thenAccept(future::complete).exceptionally(e -> {
remainingTime.addAndGet(-1 *
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
long nextDelay = Math.min(backoff.next(), remainingTime.get());
@@ -1179,7 +1187,7 @@ public class PulsarClientImpl implements PulsarClient {
+ "Will try again in {} ms", topicName, nextDelay);
remainingTime.addAndGet(-nextDelay);
getPartitionedTopicMetadata(topicName, backoff, remainingTime,
future, previousExceptions,
- metadataAutoCreationEnabled);
+ metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
@@ -1187,7 +1195,7 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic,
boolean metadataAutoCreationEnabled) {
- return getPartitionedTopicMetadata(topic,
metadataAutoCreationEnabled).thenApply(metadata -> {
+ return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled,
false).thenApply(metadata -> {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 45a3ad4f978..ce19cbf873e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -80,7 +80,7 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
public CompletableFuture<Void> startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
return pulsarClient.getPartitionedTopicMetadata(
-
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true)
+
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(),
true, false)
.thenCompose(partitionMeta -> {
List<CompletableFuture<Void>> connectFutureList = new
ArrayList<>();
if (LOG.isDebugEnabled()) {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 191124bb7b0..02a4d2ebba8 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -154,7 +154,7 @@ public class MultiTopicsConsumerImplTest {
int completionDelayMillis = 100;
Schema<byte[]> schema = Schema.BYTES;
PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
- when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+ when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(),
anyBoolean()))
.thenAnswer(invocation -> createDelayedCompletedFuture(
new PartitionedTopicMetadata(), completionDelayMillis));
MultiTopicsConsumerImpl<byte[]> impl = new
MultiTopicsConsumerImpl<byte[]>(
@@ -203,7 +203,7 @@ public class MultiTopicsConsumerImplTest {
int completionDelayMillis = 10;
Schema<byte[]> schema = Schema.BYTES;
PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
- when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+ when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(),
anyBoolean()))
.thenAnswer(invocation -> createExceptionFuture(
new PulsarClientException.InvalidConfigurationException("a
mock exception"), completionDelayMillis));
CompletableFuture<Consumer<byte[]>> completeFuture = new
CompletableFuture<>();
@@ -240,7 +240,7 @@ public class MultiTopicsConsumerImplTest {
// Simulate non partitioned topics
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);
- when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+ when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(),
anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(metadata));
CompletableFuture<Consumer<byte[]>> completeFuture = new
CompletableFuture<>();
@@ -252,7 +252,7 @@ public class MultiTopicsConsumerImplTest {
// getPartitionedTopicMetadata should have been called only the first
time, for each of the 3 topics,
// but not anymore since the topics are not partitioned.
- verify(clientMock, times(3)).getPartitionedTopicMetadata(any(),
anyBoolean());
+ verify(clientMock, times(3)).getPartitionedTopicMetadata(any(),
anyBoolean(), anyBoolean());
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 3e897ed89f2..103254a6b90 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -108,7 +108,7 @@ public class PulsarClientImplTest {
nullable(String.class)))
.thenReturn(CompletableFuture.completedFuture(
new GetTopicsResult(Collections.emptyList(), null,
false, true)));
- when(lookup.getPartitionedTopicMetadata(any(TopicName.class),
anyBoolean()))
+ when(lookup.getPartitionedTopicMetadata(any(TopicName.class),
anyBoolean(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata()));
when(lookup.getBroker(any()))
.thenReturn(CompletableFuture.completedFuture(new
LookupTopicResult(
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 5a7eb582eb5..5067ed64079 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -295,6 +295,7 @@ message CommandConnect {
optional string proxy_version = 11; // Version of the proxy. Should only
be forwarded by a proxy.
}
+// Please also add a new enum for the class
"PulsarClientException.FailedFeatureCheck" when adding a new feature flag.
message FeatureFlags {
optional bool supports_auth_refresh = 1 [default = false];
optional bool supports_broker_entry_metadata = 2 [default = false];