This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5b2562dc86d [fix][broker] Fix incorrect topic loading latency metric
and timeout might not be respected (#24785)
5b2562dc86d is described below
commit 5b2562dc86df5e46410d9ebfce7b30a03a60baeb
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Sep 30 11:02:23 2025 +0800
[fix][broker] Fix incorrect topic loading latency metric and timeout might
not be respected (#24785)
---
.../pulsar/broker/service/BrokerService.java | 141 ++++++++++++---------
.../pulsar/broker/service/TopicLoadingContext.java | 65 ++++++++++
.../pulsar/broker/service/BrokerServiceTest.java | 7 +-
.../buffer/TopicTransactionBufferTest.java | 5 +-
4 files changed, 150 insertions(+), 68 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 044f44a644a..5d9cad30ae9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -80,7 +80,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -96,6 +95,7 @@ import
org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.ImmutablePair;
import
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.broker.PulsarServerException;
@@ -195,6 +195,7 @@ import
org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1042,7 +1043,7 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing,
- Map<String, String>
properties) {
+ @Nullable Map<String,
String> properties) {
return getTopic(TopicName.get(topic), createIfMissing, properties);
}
@@ -1126,7 +1127,7 @@ public class BrokerService implements Closeable {
* @return CompletableFuture with an Optional of the topic if found or
created, otherwise empty.
*/
public CompletableFuture<Optional<Topic>> getTopic(final TopicName
topicName, boolean createIfMissing,
- Map<String, String>
properties) {
+ @Nullable Map<String,
String> properties) {
try {
// If topic future exists in the cache returned directly
regardless of whether it fails or timeout.
CompletableFuture<Optional<Topic>> tp =
topics.get(topicName.toString());
@@ -1142,13 +1143,31 @@ public class BrokerService implements Closeable {
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
- return
checkNonPartitionedTopicExists(topicName).thenCompose(exists -> {
+ final CompletableFuture<Optional<Topic>> topicFuture =
FutureUtil.createFutureWithTimeout(
+
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()),
executor(),
+ () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
+ final var context = new TopicLoadingContext(topicName,
createIfMissing, topicFuture);
+ if (properties != null) {
+ context.setProperties(properties);
+ }
+ topicFuture.exceptionally(t -> {
+ final var now = System.nanoTime();
+ if (FutureUtil.unwrapCompletionException(t) instanceof
TimeoutException) {
+ log.warn("Failed to load {} after {} ms", topicName,
context.latencyMs(now));
+ } else {
+ log.warn("Failed to load {} after {} ms", topicName,
context.latencyString(now), t);
+ }
+ pulsarStats.recordTopicLoadFailed();
+ return Optional.empty();
+ });
+ checkNonPartitionedTopicExists(topicName).thenAccept(exists ->
{
if (!exists && !createIfMissing) {
- return
CompletableFuture.completedFuture(Optional.empty());
+ topicFuture.complete(Optional.empty());
+ return;
}
// The topic level policies are not needed now, but the
meaning of calling
// "getTopicPoliciesBypassSystemTopic" will wait for
system topic policies initialization.
- return getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY)
+ getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY)
.exceptionally(ex -> {
final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
@@ -1156,11 +1175,29 @@ public class BrokerService implements Closeable {
rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
- }).thenCompose(optionalTopicPolicies -> {
- return topics.computeIfAbsent(topicName.toString(),
- (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties));
+ }).thenRun(() -> {
+ final var inserted = new MutableBoolean(false);
+ final var cachedFuture =
topics.computeIfAbsent(topicName.toString(), ___ -> {
+ inserted.setTrue();
+ return loadOrCreatePersistentTopic(context);
+ });
+ if (inserted.isFalse()) {
+ // This case should happen rarely when the same
topic is loaded concurrently because we
+ // checked if the `topics` cache includes this
topic before, so the latency is not the
+ // actual loading latency that should not be
recorded in metrics.
+ log.info("[{}] Finished loading from other
concurrent loading task (latency: {})",
+ topicName,
context.latencyString(System.nanoTime()));
+ cachedFuture.whenComplete((optTopic, e) -> {
+ if (e == null) {
+ topicFuture.complete(optTopic);
+ } else {
+ topicFuture.completeExceptionally(e);
+ }
+ });
+ }
});
});
+ return topicFuture;
} else {
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
@@ -1631,29 +1668,16 @@ public class BrokerService implements Closeable {
/**
* It creates a topic async and returns CompletableFuture. It also
throttles down configured max-concurrent topic
* loading and puts them into queue once in-process topics are created.
- *
- * @param topic persistent-topic name
- * @return CompletableFuture<Topic>
- * @throws RuntimeException
*/
- protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic,
- boolean createIfMissing, Map<String, String> properties) {
- final CompletableFuture<Optional<Topic>> topicFuture =
FutureUtil.createFutureWithTimeout(
-
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()),
executor(),
- () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
-
- topicFuture.exceptionally(t -> {
- pulsarStats.recordTopicLoadFailed();
- return null;
- });
-
+ protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(TopicLoadingContext context) {
+ final var topic = context.getTopicName().toString();
+ final var topicFuture = context.getTopicFuture();
checkTopicNsOwnership(topic)
.thenRun(() -> {
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
if (topicLoadSemaphore.tryAcquire()) {
- checkOwnershipAndCreatePersistentTopic(topic,
createIfMissing, topicFuture,
- properties);
+ checkOwnershipAndCreatePersistentTopic(context);
topicFuture.handle((persistentTopic, ex) -> {
// release permit and process pending topic
topicLoadSemaphore.release();
@@ -1668,8 +1692,7 @@ public class BrokerService implements Closeable {
return null;
});
} else {
- pendingTopicLoadingQueue.add(new
TopicLoadingContext(topic,
- createIfMissing, topicFuture, properties));
+ pendingTopicLoadingQueue.add(context);
if (log.isDebugEnabled()) {
log.debug("topic-loading for {} added into pending
queue", topic);
}
@@ -1712,23 +1735,23 @@ public class BrokerService implements Closeable {
}
}
- private void checkOwnershipAndCreatePersistentTopic(final String topic,
boolean createIfMissing,
- CompletableFuture<Optional<Topic>>
topicFuture,
- Map<String, String> properties) {
- TopicName topicName = TopicName.get(topic);
+ private void checkOwnershipAndCreatePersistentTopic(TopicLoadingContext
context) {
+ TopicName topicName = context.getTopicName();
+ final var topic = topicName.toString();
+ final var topicFuture = context.getTopicFuture();
checkTopicNsOwnership(topic).thenRun(() -> {
CompletableFuture<Map<String, String>> propertiesFuture;
- if (properties == null) {
+ if (context.getProperties() == null) {
//Read properties from storage when loading topic.
propertiesFuture = fetchTopicPropertiesAsync(topicName);
} else {
- propertiesFuture =
CompletableFuture.completedFuture(properties);
+ propertiesFuture =
CompletableFuture.completedFuture(context.getProperties());
}
- propertiesFuture.thenAccept(finalProperties ->
- //TODO add topicName in properties?
- createPersistentTopic0(topic, createIfMissing, topicFuture,
- finalProperties)
- ).exceptionally(throwable -> {
+ propertiesFuture.thenAccept(finalProperties -> {
+ context.setProperties(finalProperties);
+ //TODO add topicName in properties?
+ createPersistentTopic0(context);
+ }).exceptionally(throwable -> {
log.warn("[{}] Read topic property failed", topic, throwable);
pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(throwable);
@@ -1742,11 +1765,11 @@ public class BrokerService implements Closeable {
}
@VisibleForTesting
- public void createPersistentTopic0(final String topic, boolean
createIfMissing,
- CompletableFuture<Optional<Topic>>
topicFuture,
- Map<String, String> properties) {
- TopicName topicName = TopicName.get(topic);
- final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ public void createPersistentTopic0(TopicLoadingContext context) {
+ TopicName topicName = context.getTopicName();
+ final var topic = topicName.toString();
+ final var topicFuture = context.getTopicFuture();
+ final var createIfMissing = context.isCreateIfMissing();
if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system
topic %s", topic);
@@ -1780,7 +1803,9 @@ public class BrokerService implements Closeable {
new ManagedLedgerInterceptorImpl(interceptors,
brokerEntryPayloadProcessors));
}
managedLedgerConfig.setCreateIfMissing(createIfMissing);
- managedLedgerConfig.setProperties(properties);
+ if (context.getProperties() != null) {
+ managedLedgerConfig.setProperties(context.getProperties());
+ }
String shadowSource = managedLedgerConfig.getShadowSource();
if (shadowSource != null) {
managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
@@ -1825,10 +1850,11 @@ public class BrokerService implements Closeable {
return
persistentTopic.checkDeduplicationStatus();
})
.thenRun(() -> {
- log.info("Created topic {} - dedup
is {}", topic,
-
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
- long topicLoadLatencyMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- -
topicCreateTimeMs;
+ long nowInNanos =
System.nanoTime();
+ long topicLoadLatencyMs =
context.latencyMs(nowInNanos);
+ log.info("Created topic {} - dedup
is {} (latency: {})", topic,
+
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled",
+
context.latencyString(nowInNanos));
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if
(!topicFuture.complete(Optional.of(persistentTopic))) {
// Check create persistent
topic timeout.
@@ -3244,15 +3270,13 @@ public class BrokerService implements Closeable {
return;
}
- final String topic = pendingTopic.getTopic();
+ pendingTopic.polledFromQueue();
+ final String topic = pendingTopic.getTopicName().toString();
checkTopicNsOwnership(topic).thenRun(() -> {
CompletableFuture<Optional<Topic>> pendingFuture =
pendingTopic.getTopicFuture();
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
- checkOwnershipAndCreatePersistentTopic(topic,
- pendingTopic.isCreateIfMissing(),
- pendingFuture,
- pendingTopic.getProperties());
+ checkOwnershipAndCreatePersistentTopic(pendingTopic);
pendingFuture.handle((persistentTopic, ex) -> {
// release permit and process next pending topic
if (acquiredPermit) {
@@ -3827,13 +3851,4 @@ public class BrokerService implements Closeable {
public void
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
}
-
- @AllArgsConstructor
- @Getter
- private static class TopicLoadingContext {
- private final String topic;
- private final boolean createIfMissing;
- private final CompletableFuture<Optional<Topic>> topicFuture;
- private final Map<String, String> properties;
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java
new file mode 100644
index 00000000000..9e3ed230cd2
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.jspecify.annotations.Nullable;
+
+@RequiredArgsConstructor
+public class TopicLoadingContext {
+
+ private static final String EXAMPLE_LATENCY_OUTPUTS = "1234 ms (queued:
567)";
+
+ private final long startNs = System.nanoTime();
+ @Getter
+ private final TopicName topicName;
+ @Getter
+ private final boolean createIfMissing;
+ @Getter
+ private final CompletableFuture<Optional<Topic>> topicFuture;
+ @Getter
+ @Setter
+ @Nullable private Map<String, String> properties;
+ private long polledFromQueueNs = -1L;
+
+ public void polledFromQueue() {
+ polledFromQueueNs = System.nanoTime();
+ }
+
+ public long latencyMs(long nowInNanos) {
+ return TimeUnit.NANOSECONDS.toMillis(nowInNanos - startNs);
+ }
+
+ public String latencyString(long nowInNanos) {
+ final var builder = new
StringBuilder(EXAMPLE_LATENCY_OUTPUTS.length());
+ builder.append(latencyMs(nowInNanos));
+ builder.append(" ms");
+ if (polledFromQueueNs >= 0) {
+ builder.append(" (queued:
").append(latencyMs(polledFromQueueNs)).append(")");
+ }
+ return builder.toString();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 49f5d7c5c36..62dcc37f38e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1184,7 +1184,8 @@ public class BrokerServiceTest extends BrokerTestBase {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
- .loadOrCreatePersistentTopic(topicName, true, null);
+ .loadOrCreatePersistentTopic(new TopicLoadingContext(topic,
true,
+ new CompletableFuture<>()));
try {
futureResult.get();
@@ -1227,8 +1228,8 @@ public class BrokerServiceTest extends BrokerTestBase {
ArrayList<CompletableFuture<Optional<Topic>>> loadFutures = new
ArrayList<>();
for (int i = 0; i < 10; i++) {
// try to create topic which should fail as bundle is disable
- CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
- .loadOrCreatePersistentTopic(topicName + "_" + i,
false, null);
+ CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService().loadOrCreatePersistentTopic(
+ new TopicLoadingContext(TopicName.get(topicName + "_"
+ i), false, new CompletableFuture<>()));
loadFutures.add(futureResult);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 93654db2c99..5a54b37a637 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.opentelemetry.api.common.Attributes;
import java.time.Duration;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -47,6 +46,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TopicLoadingContext;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
@@ -178,7 +178,8 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
.newTopic(Mockito.eq(topic), Mockito.any(),
Mockito.eq(brokerService),
Mockito.eq(PersistentTopic.class));
- brokerService.createPersistentTopic0(topic, true, new
CompletableFuture<>(), Collections.emptyMap());
+ brokerService.createPersistentTopic0(new
TopicLoadingContext(TopicName.get(topic), true,
+ new CompletableFuture<>()));
Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get()
!= null);
PersistentTopic persistentTopic = reference.get();