Denovo1998 commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2619248489


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -50,15 +68,21 @@ public static class TopicListWatcher implements 
BiConsumer<String, NotificationT
         private final long id;
         /** The regexp for the topic name(not contains partition suffix). **/
         private final TopicsPattern topicsPattern;
+        private final Executor executor;
+        private volatile boolean closed = false;
+        private boolean sendTopicListSuccessCompleted = false;
+        private BlockingDeque<Runnable> sendTopicListUpdateTasksBeforeInit = 
new LinkedBlockingDeque<>();

Review Comment:
   No capacity limit. Will there be any issues here? When a watcher has 
registered a listener (to "not miss events"), but sendWatchTopicListSuccess 
cannot be obtained for a long time due to pressure on direct permits (or 
repeatedly times out and retries), all updates will enter this queue.
   
   Additionally, since both sendTopicListUpdate(...) and 
sendTopicListSuccessCompleted() are synchronized here, maybe the concurrency 
capability of `LinkedBlockingDeque` is redundant?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -288,8 +405,67 @@ public void deleteTopicListWatcher(Long watcherId) {
      */
     public void sendTopicListUpdate(long watcherId, String topicsHash, 
List<String> deletedTopics,
                                     List<String> newTopics) {
-        connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
update", permitAcquireErrorHandler ->
+                () -> connection.getCommandSender()
+                        .sendWatchTopicListUpdate(watcherId, newTopics, 
deletedTopics, topicsHash,
+                                permitAcquireErrorHandler)
+                        .whenComplete((__, t) -> {
+                            if (t != null) {
+                                // this is an unexpected case
+                                log.warn("[{}] Failed to send topic list 
update for watcherId={}. Watcher will be in "
+                                        + "inconsistent state.", connection, 
watcherId, t);
+                            }
+                        }));
+    }
+
+    // performs an operation with infinite permit acquiring retries.
+    // If acquiring permits fails, it will retry after a backoff period
+    private void performOperationWithPermitAcquiringRetries(long watcherId, 
String operationName,
+                                                            
Function<Consumer<Throwable>,
+                                                                    
Supplier<CompletableFuture<Void>>>
+                                                                    
asyncOperationFactory) {
+        // holds a reference to the operation, this is to resolve a circular 
dependency between the error handler and
+        // the actual operation
+        AtomicReference<Runnable> operationRef = new AtomicReference<>();
+        // create the error handler for the operation
+        Consumer<Throwable> permitAcquireErrorHandler =
+                createPermitAcquireErrorHandler(watcherId, operationName, 
operationRef);
+        // create the async operation using the factory function. Pass the 
error handler to the factory function.
+        Supplier<CompletableFuture<Void>> asyncOperation = 
asyncOperationFactory.apply(permitAcquireErrorHandler);
+        // set the operation to run into the operation reference
+        operationRef.set(Runnables.catchingAndLoggingThrowables(() -> {
+            if (!connection.isActive() || !watchers.containsKey(watcherId)) {
+                // do nothing if the connection has already been closed or the 
watcher has been removed
+                return;
+            }
+            asyncOperation.get().thenRun(() -> retryBackoff.reset());
+        }));
+        // run the operation
+        operationRef.get().run();
     }
 
+    // retries acquiring permits until the connection is closed or the watcher 
is removed
+    private Consumer<Throwable> createPermitAcquireErrorHandler(long 
watcherId, String operationName,
+                                                                
AtomicReference<Runnable> operationRef) {
+        ScheduledExecutorService scheduledExecutor = 
connection.ctx().channel().eventLoop();
+        AtomicInteger retryCount = new AtomicInteger(0);
+        return t -> {
+            Throwable unwrappedException = 
FutureUtil.unwrapCompletionException(t);
+            if (unwrappedException instanceof 
AsyncSemaphore.PermitAcquireCancelledException
+                    || unwrappedException instanceof 
AsyncSemaphore.PermitAcquireAlreadyClosedException
+                    || !connection.isActive()
+                    || !watchers.containsKey(watcherId)) {
+                return;
+            }
+            long retryDelay = retryBackoff.next();
+            log.info("[{}] Cannot acquire direct memory tokens for sending {}. 
Retry {} in {} ms. {}", connection,
+                    operationName, retryCount.get(), retryDelay, 
t.getMessage());

Review Comment:
   The `retryCount` variable is created but never incremented.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerTopicWatcherBackPressureMultipleConsumersTest 
extends MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        isTcpLookup = useTcpLookup();
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSubscriptionPatternMaxLength(100);
+    }
+
+    protected boolean useTcpLookup() {
+        return true;
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 60 * 1000)
+    public void 
testPatternConsumerWithLargeAmountOfConcurrentClientConnections()
+            throws PulsarAdminException, InterruptedException, IOException, 
ExecutionException, TimeoutException {
+        // create a new namespace for this test
+        String namespace = BrokerTestUtil.newUniqueName("public/ns");
+        admin.namespaces().createNamespace(namespace);
+
+        // use multiple clients so that each client has a separate connection 
to the broker
+        final int numberOfClients = 100;
+
+        // create a long topic name to consume more memory per topic
+        final String topicNamePrefix = "persistent://" + namespace + "/" + 
StringUtils.repeat('a', 512) + "-";
+        // number of topics to create
+        final int topicCount = 300;
+
+        // create topics
+        createTopics(topicCount, topicNamePrefix, "_0");
+
+        {
+            @Cleanup
+            PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+                    // limit number of threads so that the test behaves 
somewhat similarly in CI
+                    .configureEventLoop(eventLoopGroupConfig -> 
eventLoopGroupConfig.numberOfThreads(2))
+                    
.configureThreadPool(PulsarClientSharedResources.SharedResource.InternalExecutor,
+                            threadPoolConfig -> 
threadPoolConfig.numberOfThreads(2))
+                    .build();
+            List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
+            @Cleanup
+            Closeable closeClients = () -> {
+                for (PulsarClient client : clients) {
+                    try {
+                        client.close();
+                    } catch (PulsarClientException e) {
+                        log.error("Failed to close client {}", client, e);
+                    }
+                }
+            };
+            for (int i = 0; i < numberOfClients; i++) {
+                PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder()
+                        .serviceUrl(getClientServiceUrl())
+                        .statsInterval(0, TimeUnit.SECONDS)
+                        .operationTimeout(1, TimeUnit.MINUTES)
+                        .sharedResources(sharedResources)
+                        .build();
+                clients.add(client);
+            }
+
+            List<CompletableFuture<Consumer<String>>> consumerFutures = new 
ArrayList<>(numberOfClients);
+            for (int i = 0; i < topicCount; i++) {
+                String topicsPattern = namespace + "/a+-" + i + "_[01]$";
+                CompletableFuture<Consumer<String>> consumerFuture =
+                        clients.get(i % 
numberOfClients).newConsumer(Schema.STRING)
+                                
.topicsPattern(topicsPattern).subscriptionName("sub" + i)
+                                .subscribeAsync();
+                consumerFutures.add(consumerFuture);
+                consumerFuture.exceptionally(throwable -> {
+                    log.error("Failed to subscribe to pattern {}", 
topicsPattern, throwable);
+                    return null;
+                });
+            }
+
+            FutureUtil.waitForAll(consumerFutures).get(60, TimeUnit.SECONDS);
+
+            List<Consumer<String>> consumers = 
consumerFutures.stream().map(CompletableFuture::join).toList();
+
+            PulsarClientImpl client = clients.get(0);
+            sendAndValidate(topicCount, client, consumers, topicNamePrefix, 
"_0");
+
+            // create additional topics
+            createTopics(topicCount, topicNamePrefix, "_1");
+
+            // send to additional topic
+            sendAndValidate(topicCount, client, consumers, topicNamePrefix, 
"_1");
+        }
+
+        validateThatTokensHaventLeakedOrIncreased();
+    }
+
+    protected void validateThatTokensHaventLeakedOrIncreased() {
+        AsyncDualMemoryLimiterImpl limiter =
+                pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+                
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 
1024 * 1024);

Review Comment:
   If `getMaxTopicListInFlightHeapMemSizeMB()` or 
`getMaxTopicListInFlightDirectMemSizeMB()` returns a value of 2048 or more, the 
expression `* 1024 * 1024` will overflow a 32-bit integer. To prevent this, use 
`1024L` to ensure the computation is performed with long integers.
   
   ```diff
   -        
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 
1024 * 1024);
   +        
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 
1024L * 1024);
   -        
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 
1024 * 1024);
   +        
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 
1024L * 1024);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -218,31 +263,100 @@ public void handleWatchTopicList(NamespaceName 
namespaceName, long watcherId, lo
                 });
     }
 
+    private void sendTopicListSuccessWithPermitAcquiringRetries(long 
watcherId, long requestId, List<String> topicList,
+                                                                String hash,
+                                                                Runnable 
successfulCompletionCallback) {
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
success", permitAcquireErrorHandler ->

Review Comment:
   If an earlier update A is removed from the semaphore queue due to 
timeout/queuefull and retried, and a later update B gets the permits first and 
sends successfully, it is possible for B to arrive first and A to arrive later.
   
   For the pattern consumer, out-of-order updates may lead to inconsistent 
final subscription sets (typical example: A=Create topic2, B=Delete topic2; if 
B arrives first, the client may mistakenly retain topic2).
   
   Perhaps we need to maintain a "send chain" or queue for each watcher, 
ensuring that only one update/success is trying to acquire + write at any 
moment; if it fails, retry this "head of the queue," and subsequent updates 
cannot bypass it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to