heesung-sn commented on code in PR #21408:
URL: https://github.com/apache/pulsar/pull/21408#discussion_r1375054673


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2190,8 +2190,10 @@ public CompletableFuture<Void> 
checkTopicNsOwnership(final String topic) {
     }
 
     public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle 
serviceUnit,

Review Comment:
   Probably not. We need to pass this flag to topic.close(..) in this func 
anyway and reduce some code redundancy.
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -385,6 +390,110 @@ public boolean test(NamespaceBundle namespaceBundle) {
             assertTrue(ex.getMessage().contains("cannot be transfer to same 
broker"));
         }
     }
+    @DataProvider(name = "isPersistentTopicTest")
+    public Object[][] isPersistentTopicTest() {
+        return new Object[][] { { true }, { false }};
+    }
+    @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
+    public void testTransferClientReconnectionWithoutLookup(boolean 
isPersistentTopicTest) throws Exception {
+        String topicType = isPersistentTopicTest? "persistent" : 
"non-persistent";
+        String topic = topicType + "://" + defaultTestNamespace + 
"/test-transfer-client-reconnect";
+        TopicName topicName = TopicName.get(topic);
+
+        AtomicInteger lookupCount = new AtomicInteger();
+        var lookup = spyLookupService(lookupCount, topicName);
+        var producer = pulsarClient.newProducer().topic(topic).create();
+        int lookupCountBeforeUnload = lookupCount.get();
+
+        NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).get();
+        String broker = admin.lookups().lookupTopic(topic);
+        String dstBrokerUrl = pulsar1.getLookupServiceAddress();
+        String dstBrokerServiceUrl;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            dstBrokerUrl = pulsar2.getLookupServiceAddress();
+            dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
+        } else {
+            dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
+        }
+        checkOwnershipState(broker, bundle);
+
+        final String finalDstBrokerUrl = dstBrokerUrl;
+        CompletableFuture.runAsync(() -> {
+                try {
+                    admin.namespaces().unloadNamespaceBundle(
+                            defaultTestNamespace, bundle.getBundleRange(), 
finalDstBrokerUrl);
+                } catch (PulsarAdminException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        );
+
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            try {
+                producer.send("hi".getBytes());
+                String newOwner = admin.lookups().lookupTopic(topic);
+                assertEquals(dstBrokerServiceUrl, newOwner);
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            } catch (PulsarAdminException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        assertTrue(producer.isConnected());
+        verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName);
+        producer.close();
+    }
+
+
+
+    @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
+    public void testUnloadClientReconnectionWithLookup(boolean 
isPersistentTopicTest) throws Exception {
+        String topicType = isPersistentTopicTest? "persistent" : 
"non-persistent";
+        String topic = topicType + "://" + defaultTestNamespace + 
"/test-unload-client-reconnect-"
+                + isPersistentTopicTest;
+        TopicName topicName = TopicName.get(topic);
+
+        AtomicInteger lookupCount = new AtomicInteger();
+        var lookup = spyLookupService(lookupCount, topicName);
+
+        var producer = pulsarClient.newProducer().topic(topic).create();
+        int lookupCountBeforeUnload = lookupCount.get();
+
+        NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).get();
+        CompletableFuture.runAsync(() -> {
+                    try {
+                        admin.namespaces().unloadNamespaceBundle(
+                                defaultTestNamespace, bundle.getBundleRange());
+                    } catch (PulsarAdminException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+        );
+        MutableInt sendCount = new MutableInt();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            try {
+                producer.send("hi".getBytes());
+                assertEquals(sendCount.incrementAndGet(), 10);
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        assertTrue(producer.isConnected());
+        verify(lookup, times(lookupCountBeforeUnload + 
1)).getBroker(topicName);
+        producer.close();
+    }
+
+    private LookupService spyLookupService(AtomicInteger lookupCount, 
TopicName topicName)
+            throws IllegalAccessException {
+        var lookup = spy((LookupService)

Review Comment:
   sure. updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -484,14 +485,22 @@ private CompletableFuture<Void> delete(boolean 
failIfHasSubscriptions, boolean c
         return deleteFuture;
     }
 
+
+    @Override
+    public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
+        return close(false, closeWithoutWaitingClientDisconnect);
+    }
+
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic.
      *
+     * @param closeWithoutDisconnectingClients don't disconnect clients

Review Comment:
   yes, I was following the `closeWithoutWaitingClientDisconnect`'s convention. 
Probably we need a minor PR to fix this naming, if we want to refactor it.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -801,11 +802,29 @@ protected void handleError(CommandError error) {
 
     @Override
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
-        log.info("[{}] Broker notification of Closed producer: {}", 
remoteAddress, closeProducer.getProducerId());
         final long producerId = closeProducer.getProducerId();
         ProducerImpl<?> producer = producers.remove(producerId);
         if (producer != null) {
-            producer.connectionClosed(this);
+            if (closeProducer.hasAssignedBrokerServiceUrl() || 
closeProducer.hasAssignedBrokerServiceUrlTls()) {
+                try {
+                    final URI uri = new URI(producer.client.conf.isUseTls()
+                            ? closeProducer.getAssignedBrokerServiceUrlTls()
+                            : closeProducer.getAssignedBrokerServiceUrl());
+                    log.info("[{}] Broker notification of Closed producer: {}. 
Redirecting to {}.",
+                            remoteAddress, closeProducer.getProducerId(), uri);
+                    producer.getConnectionHandler().connectionClosed(this, 0L, 
Optional.of(uri));
+                } catch (URISyntaxException e) {
+                    log.error("[{}] Invalid redirect url {}/{} for {}", 
remoteAddress,
+                            closeProducer.getAssignedBrokerServiceUrl(),
+                            closeProducer.getAssignedBrokerServiceUrlTls(),
+                            closeProducer.getRequestId());
+                    producer.connectionClosed(this);
+                }
+            } else {
+                log.info("[{}] Broker notification of Closed producer: {}.",

Review Comment:
   no, because the original log at line 804 was `info` level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to