heesung-sn commented on code in PR #21682:
URL: https://github.com/apache/pulsar/pull/21682#discussion_r1418044228
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -542,6 +544,10 @@ public SubType getType() {
@Override
public final synchronized void readEntriesComplete(List<Entry> entries,
Object ctx) {
+ if (topic.isTransferring()) {
Review Comment:
nit: can we add some comments why we return for readers(wherever this
isTransferring flag is used)?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -410,111 +426,240 @@ public boolean test(NamespaceBundle namespaceBundle) {
assertTrue(ex.getMessage().contains("cannot be transfer to same
broker"));
}
}
- @DataProvider(name = "isPersistentTopicTest")
- public Object[][] isPersistentTopicTest() {
- return new Object[][] { { true }, { false }};
+
+ @DataProvider(name = "isPersistentTopicSubscriptionTypeTest")
+ public Object[][] isPersistentTopicSubscriptionTypeTest() {
+ return new Object[][]{
+ {TopicDomain.persistent, SubscriptionType.Exclusive},
+ {TopicDomain.persistent, SubscriptionType.Shared},
+ {TopicDomain.persistent, SubscriptionType.Failover},
+ {TopicDomain.persistent, SubscriptionType.Key_Shared},
Review Comment:
Why do we need to cover all subscription types? Can we use Exclusive and
(Key_)Shared?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1509,14 +1508,15 @@ public CompletableFuture<Void> close(
shadowReplicators.forEach((__, replicator) ->
futures.add(replicator.disconnect()));
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
- brokerService.getPulsar(), topic).thenAccept(lookupData ->
- producers.values().forEach(producer ->
futures.add(producer.disconnect(lookupData)))
+ brokerService.getPulsar(), topic).thenAccept(lookupData ->
{
+ producers.values().forEach(producer ->
futures.add(producer.disconnect(lookupData)));
+ subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect(lookupData)));
Review Comment:
Do we need to close the subscription and its dispatcher without
disconnecting consumers when disconnectClients=false here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]