Demogorgon314 commented on code in PR #21408:
URL: https://github.com/apache/pulsar/pull/21408#discussion_r1372503627
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -484,14 +485,22 @@ private CompletableFuture<Void> delete(boolean
failIfHasSubscriptions, boolean c
return deleteFuture;
}
+
+ @Override
+ public CompletableFuture<Void> close(boolean
closeWithoutWaitingClientDisconnect) {
+ return close(false, closeWithoutWaitingClientDisconnect);
+ }
+
/**
* Close this topic - close all producers and subscriptions associated
with this topic.
*
+ * @param closeWithoutDisconnectingClients don't disconnect clients
Review Comment:
nit: I think the parameter should be `disconnectingClients`, it will be
clearer to other developers. But it is not this PR's responsibility.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -385,6 +390,110 @@ public boolean test(NamespaceBundle namespaceBundle) {
assertTrue(ex.getMessage().contains("cannot be transfer to same
broker"));
}
}
+ @DataProvider(name = "isPersistentTopicTest")
+ public Object[][] isPersistentTopicTest() {
+ return new Object[][] { { true }, { false }};
+ }
+ @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
+ public void testTransferClientReconnectionWithoutLookup(boolean
isPersistentTopicTest) throws Exception {
+ String topicType = isPersistentTopicTest? "persistent" :
"non-persistent";
+ String topic = topicType + "://" + defaultTestNamespace +
"/test-transfer-client-reconnect";
+ TopicName topicName = TopicName.get(topic);
+
+ AtomicInteger lookupCount = new AtomicInteger();
+ var lookup = spyLookupService(lookupCount, topicName);
+ var producer = pulsarClient.newProducer().topic(topic).create();
+ int lookupCountBeforeUnload = lookupCount.get();
+
+ NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
+ String broker = admin.lookups().lookupTopic(topic);
+ String dstBrokerUrl = pulsar1.getLookupServiceAddress();
+ String dstBrokerServiceUrl;
+ if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+ dstBrokerUrl = pulsar2.getLookupServiceAddress();
+ dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
+ } else {
+ dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
+ }
+ checkOwnershipState(broker, bundle);
+
+ final String finalDstBrokerUrl = dstBrokerUrl;
+ CompletableFuture.runAsync(() -> {
+ try {
+ admin.namespaces().unloadNamespaceBundle(
+ defaultTestNamespace, bundle.getBundleRange(),
finalDstBrokerUrl);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ try {
+ producer.send("hi".getBytes());
+ String newOwner = admin.lookups().lookupTopic(topic);
+ assertEquals(dstBrokerServiceUrl, newOwner);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertTrue(producer.isConnected());
+ verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName);
+ producer.close();
+ }
+
+
+
+ @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
+ public void testUnloadClientReconnectionWithLookup(boolean
isPersistentTopicTest) throws Exception {
+ String topicType = isPersistentTopicTest? "persistent" :
"non-persistent";
+ String topic = topicType + "://" + defaultTestNamespace +
"/test-unload-client-reconnect-"
+ + isPersistentTopicTest;
+ TopicName topicName = TopicName.get(topic);
+
+ AtomicInteger lookupCount = new AtomicInteger();
+ var lookup = spyLookupService(lookupCount, topicName);
+
+ var producer = pulsarClient.newProducer().topic(topic).create();
+ int lookupCountBeforeUnload = lookupCount.get();
+
+ NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
+ CompletableFuture.runAsync(() -> {
+ try {
+ admin.namespaces().unloadNamespaceBundle(
+ defaultTestNamespace, bundle.getBundleRange());
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ MutableInt sendCount = new MutableInt();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ try {
+ producer.send("hi".getBytes());
+ assertEquals(sendCount.incrementAndGet(), 10);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertTrue(producer.isConnected());
+ verify(lookup, times(lookupCountBeforeUnload +
1)).getBroker(topicName);
+ producer.close();
+ }
+
+ private LookupService spyLookupService(AtomicInteger lookupCount,
TopicName topicName)
+ throws IllegalAccessException {
+ var lookup = spy((LookupService)
Review Comment:
Do you think we should reset this object after the use is done?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -801,11 +802,29 @@ protected void handleError(CommandError error) {
@Override
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
- log.info("[{}] Broker notification of Closed producer: {}",
remoteAddress, closeProducer.getProducerId());
final long producerId = closeProducer.getProducerId();
ProducerImpl<?> producer = producers.remove(producerId);
if (producer != null) {
- producer.connectionClosed(this);
+ if (closeProducer.hasAssignedBrokerServiceUrl() ||
closeProducer.hasAssignedBrokerServiceUrlTls()) {
+ try {
+ final URI uri = new URI(producer.client.conf.isUseTls()
+ ? closeProducer.getAssignedBrokerServiceUrlTls()
+ : closeProducer.getAssignedBrokerServiceUrl());
+ log.info("[{}] Broker notification of Closed producer: {}.
Redirecting to {}.",
+ remoteAddress, closeProducer.getProducerId(), uri);
+ producer.getConnectionHandler().connectionClosed(this, 0L,
Optional.of(uri));
+ } catch (URISyntaxException e) {
+ log.error("[{}] Invalid redirect url {}/{} for {}",
remoteAddress,
+ closeProducer.getAssignedBrokerServiceUrl(),
+ closeProducer.getAssignedBrokerServiceUrlTls(),
+ closeProducer.getRequestId());
+ producer.connectionClosed(this);
+ }
+ } else {
+ log.info("[{}] Broker notification of Closed producer: {}.",
Review Comment:
Should we make this log as debug level?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]