Denovo1998 commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2619248489
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -50,15 +68,21 @@ public static class TopicListWatcher implements
BiConsumer<String, NotificationT
private final long id;
/** The regexp for the topic name(not contains partition suffix). **/
private final TopicsPattern topicsPattern;
+ private final Executor executor;
+ private volatile boolean closed = false;
+ private boolean sendTopicListSuccessCompleted = false;
+ private BlockingDeque<Runnable> sendTopicListUpdateTasksBeforeInit =
new LinkedBlockingDeque<>();
Review Comment:
No capacity limit. Will there be any issues here? When a watcher has
registered a listener (to "not miss events"), but sendWatchTopicListSuccess
cannot be obtained for a long time due to pressure on direct permits (or
repeatedly times out and retries), all updates will enter this queue.
Additionally, since both sendTopicListUpdate(...) and
sendTopicListSuccessCompleted() are synchronized here, maybe the concurrency
capability of `LinkedBlockingDeque` is redundant?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -288,8 +405,67 @@ public void deleteTopicListWatcher(Long watcherId) {
*/
public void sendTopicListUpdate(long watcherId, String topicsHash,
List<String> deletedTopics,
List<String> newTopics) {
- connection.getCommandSender().sendWatchTopicListUpdate(watcherId,
newTopics, deletedTopics, topicsHash);
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
update", permitAcquireErrorHandler ->
+ () -> connection.getCommandSender()
+ .sendWatchTopicListUpdate(watcherId, newTopics,
deletedTopics, topicsHash,
+ permitAcquireErrorHandler)
+ .whenComplete((__, t) -> {
+ if (t != null) {
+ // this is an unexpected case
+ log.warn("[{}] Failed to send topic list
update for watcherId={}. Watcher will be in "
+ + "inconsistent state.", connection,
watcherId, t);
+ }
+ }));
+ }
+
+ // performs an operation with infinite permit acquiring retries.
+ // If acquiring permits fails, it will retry after a backoff period
+ private void performOperationWithPermitAcquiringRetries(long watcherId,
String operationName,
+
Function<Consumer<Throwable>,
+
Supplier<CompletableFuture<Void>>>
+
asyncOperationFactory) {
+ // holds a reference to the operation, this is to resolve a circular
dependency between the error handler and
+ // the actual operation
+ AtomicReference<Runnable> operationRef = new AtomicReference<>();
+ // create the error handler for the operation
+ Consumer<Throwable> permitAcquireErrorHandler =
+ createPermitAcquireErrorHandler(watcherId, operationName,
operationRef);
+ // create the async operation using the factory function. Pass the
error handler to the factory function.
+ Supplier<CompletableFuture<Void>> asyncOperation =
asyncOperationFactory.apply(permitAcquireErrorHandler);
+ // set the operation to run into the operation reference
+ operationRef.set(Runnables.catchingAndLoggingThrowables(() -> {
+ if (!connection.isActive() || !watchers.containsKey(watcherId)) {
+ // do nothing if the connection has already been closed or the
watcher has been removed
+ return;
+ }
+ asyncOperation.get().thenRun(() -> retryBackoff.reset());
+ }));
+ // run the operation
+ operationRef.get().run();
}
+ // retries acquiring permits until the connection is closed or the watcher
is removed
+ private Consumer<Throwable> createPermitAcquireErrorHandler(long
watcherId, String operationName,
+
AtomicReference<Runnable> operationRef) {
+ ScheduledExecutorService scheduledExecutor =
connection.ctx().channel().eventLoop();
+ AtomicInteger retryCount = new AtomicInteger(0);
+ return t -> {
+ Throwable unwrappedException =
FutureUtil.unwrapCompletionException(t);
+ if (unwrappedException instanceof
AsyncSemaphore.PermitAcquireCancelledException
+ || unwrappedException instanceof
AsyncSemaphore.PermitAcquireAlreadyClosedException
+ || !connection.isActive()
+ || !watchers.containsKey(watcherId)) {
+ return;
+ }
+ long retryDelay = retryBackoff.next();
+ log.info("[{}] Cannot acquire direct memory tokens for sending {}.
Retry {} in {} ms. {}", connection,
+ operationName, retryCount.get(), retryDelay,
t.getMessage());
Review Comment:
The `retryCount` variable is created but never incremented.
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerTopicWatcherBackPressureMultipleConsumersTest
extends MockedPulsarServiceBaseTest {
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ isTcpLookup = useTcpLookup();
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setSubscriptionPatternMaxLength(100);
+ }
+
+ protected boolean useTcpLookup() {
+ return true;
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void
testPatternConsumerWithLargeAmountOfConcurrentClientConnections()
+ throws PulsarAdminException, InterruptedException, IOException,
ExecutionException, TimeoutException {
+ // create a new namespace for this test
+ String namespace = BrokerTestUtil.newUniqueName("public/ns");
+ admin.namespaces().createNamespace(namespace);
+
+ // use multiple clients so that each client has a separate connection
to the broker
+ final int numberOfClients = 100;
+
+ // create a long topic name to consume more memory per topic
+ final String topicNamePrefix = "persistent://" + namespace + "/" +
StringUtils.repeat('a', 512) + "-";
+ // number of topics to create
+ final int topicCount = 300;
+
+ // create topics
+ createTopics(topicCount, topicNamePrefix, "_0");
+
+ {
+ @Cleanup
+ PulsarClientSharedResources sharedResources =
PulsarClientSharedResources.builder()
+ // limit number of threads so that the test behaves
somewhat similarly in CI
+ .configureEventLoop(eventLoopGroupConfig ->
eventLoopGroupConfig.numberOfThreads(2))
+
.configureThreadPool(PulsarClientSharedResources.SharedResource.InternalExecutor,
+ threadPoolConfig ->
threadPoolConfig.numberOfThreads(2))
+ .build();
+ List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
+ @Cleanup
+ Closeable closeClients = () -> {
+ for (PulsarClient client : clients) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.error("Failed to close client {}", client, e);
+ }
+ }
+ };
+ for (int i = 0; i < numberOfClients; i++) {
+ PulsarClientImpl client = (PulsarClientImpl)
PulsarClient.builder()
+ .serviceUrl(getClientServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .operationTimeout(1, TimeUnit.MINUTES)
+ .sharedResources(sharedResources)
+ .build();
+ clients.add(client);
+ }
+
+ List<CompletableFuture<Consumer<String>>> consumerFutures = new
ArrayList<>(numberOfClients);
+ for (int i = 0; i < topicCount; i++) {
+ String topicsPattern = namespace + "/a+-" + i + "_[01]$";
+ CompletableFuture<Consumer<String>> consumerFuture =
+ clients.get(i %
numberOfClients).newConsumer(Schema.STRING)
+
.topicsPattern(topicsPattern).subscriptionName("sub" + i)
+ .subscribeAsync();
+ consumerFutures.add(consumerFuture);
+ consumerFuture.exceptionally(throwable -> {
+ log.error("Failed to subscribe to pattern {}",
topicsPattern, throwable);
+ return null;
+ });
+ }
+
+ FutureUtil.waitForAll(consumerFutures).get(60, TimeUnit.SECONDS);
+
+ List<Consumer<String>> consumers =
consumerFutures.stream().map(CompletableFuture::join).toList();
+
+ PulsarClientImpl client = clients.get(0);
+ sendAndValidate(topicCount, client, consumers, topicNamePrefix,
"_0");
+
+ // create additional topics
+ createTopics(topicCount, topicNamePrefix, "_1");
+
+ // send to additional topic
+ sendAndValidate(topicCount, client, consumers, topicNamePrefix,
"_1");
+ }
+
+ validateThatTokensHaventLeakedOrIncreased();
+ }
+
+ protected void validateThatTokensHaventLeakedOrIncreased() {
+ AsyncDualMemoryLimiterImpl limiter =
+ pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() *
1024 * 1024);
Review Comment:
If `getMaxTopicListInFlightHeapMemSizeMB()` or
`getMaxTopicListInFlightDirectMemSizeMB()` returns a value of 2048 or more, the
expression `* 1024 * 1024` will overflow a 32-bit integer. To prevent this, use
`1024L` to ensure the computation is performed with long integers.
```diff
-
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() *
1024 * 1024);
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() *
1024L * 1024);
-
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() *
1024 * 1024);
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() *
1024L * 1024);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -218,31 +263,100 @@ public void handleWatchTopicList(NamespaceName
namespaceName, long watcherId, lo
});
}
+ private void sendTopicListSuccessWithPermitAcquiringRetries(long
watcherId, long requestId, List<String> topicList,
+ String hash,
+ Runnable
successfulCompletionCallback) {
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
success", permitAcquireErrorHandler ->
Review Comment:
If an earlier update A is removed from the semaphore queue due to
timeout/queuefull and retried, and a later update B gets the permits first and
sends successfully, it is possible for B to arrive first and A to arrive later.
For the pattern consumer, out-of-order updates may lead to inconsistent
final subscription sets (typical example: A=Create topic2, B=Delete topic2; if
B arrives first, the client may mistakenly retain topic2).
Perhaps we need to maintain a "send chain" or queue for each watcher,
ensuring that only one update/success is trying to acquire + write at any
moment; if it fails, retry this "head of the queue," and subsequent updates
cannot bypass it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]