This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a84c0b0a9f2d5df660f009bef00e4a7264b9e509 Author: Yunze Xu <[email protected]> AuthorDate: Tue Sep 30 11:02:23 2025 +0800 [fix][broker] Fix incorrect topic loading latency metric and timeout might not be respected (#24785) (cherry picked from commit 5b2562dc86df5e46410d9ebfce7b30a03a60baeb) --- .../pulsar/broker/service/BrokerService.java | 141 ++++++++++++--------- .../pulsar/broker/service/TopicLoadingContext.java | 65 ++++++++++ .../pulsar/broker/service/BrokerServiceTest.java | 7 +- .../buffer/TopicTransactionBufferTest.java | 5 +- 4 files changed, 150 insertions(+), 68 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 044f44a644a..5d9cad30ae9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -80,7 +80,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Predicate; import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -96,6 +95,7 @@ import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; @@ -195,6 +195,7 @@ import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1042,7 +1043,7 @@ public class BrokerService implements Closeable { } public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing, - Map<String, String> properties) { + @Nullable Map<String, String> properties) { return getTopic(TopicName.get(topic), createIfMissing, properties); } @@ -1126,7 +1127,7 @@ public class BrokerService implements Closeable { * @return CompletableFuture with an Optional of the topic if found or created, otherwise empty. */ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing, - Map<String, String> properties) { + @Nullable Map<String, String> properties) { try { // If topic future exists in the cache returned directly regardless of whether it fails or timeout. CompletableFuture<Optional<Topic>> tp = topics.get(topicName.toString()); @@ -1142,13 +1143,31 @@ public class BrokerService implements Closeable { return FutureUtil.failedFuture(new NotAllowedException( "Broker is unable to load persistent topic")); } - return checkNonPartitionedTopicExists(topicName).thenCompose(exists -> { + final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout( + Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), + () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); + final var context = new TopicLoadingContext(topicName, createIfMissing, topicFuture); + if (properties != null) { + context.setProperties(properties); + } + topicFuture.exceptionally(t -> { + final var now = System.nanoTime(); + if (FutureUtil.unwrapCompletionException(t) instanceof TimeoutException) { + log.warn("Failed to load {} after {} ms", topicName, context.latencyMs(now)); + } else { + log.warn("Failed to load {} after {} ms", topicName, context.latencyString(now), t); + } + pulsarStats.recordTopicLoadFailed(); + return Optional.empty(); + }); + checkNonPartitionedTopicExists(topicName).thenAccept(exists -> { if (!exists && !createIfMissing) { - return CompletableFuture.completedFuture(Optional.empty()); + topicFuture.complete(Optional.empty()); + return; } // The topic level policies are not needed now, but the meaning of calling // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. - return getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) + getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) .exceptionally(ex -> { final Throwable rc = FutureUtil.unwrapCompletionException(ex); final String errorInfo = String.format("Topic creation encountered an exception by initialize" @@ -1156,11 +1175,29 @@ public class BrokerService implements Closeable { rc.getMessage()); log.error(errorInfo, rc); throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - return topics.computeIfAbsent(topicName.toString(), - (tpName) -> loadOrCreatePersistentTopic(tpName, createIfMissing, properties)); + }).thenRun(() -> { + final var inserted = new MutableBoolean(false); + final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> { + inserted.setTrue(); + return loadOrCreatePersistentTopic(context); + }); + if (inserted.isFalse()) { + // This case should happen rarely when the same topic is loaded concurrently because we + // checked if the `topics` cache includes this topic before, so the latency is not the + // actual loading latency that should not be recorded in metrics. + log.info("[{}] Finished loading from other concurrent loading task (latency: {})", + topicName, context.latencyString(System.nanoTime())); + cachedFuture.whenComplete((optTopic, e) -> { + if (e == null) { + topicFuture.complete(optTopic); + } else { + topicFuture.completeExceptionally(e); + } + }); + } }); }); + return topicFuture; } else { if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -1631,29 +1668,16 @@ public class BrokerService implements Closeable { /** * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic * loading and puts them into queue once in-process topics are created. - * - * @param topic persistent-topic name - * @return CompletableFuture<Topic> - * @throws RuntimeException */ - protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic, - boolean createIfMissing, Map<String, String> properties) { - final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout( - Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), - () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); - - topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(); - return null; - }); - + protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(TopicLoadingContext context) { + final var topic = context.getTopicName().toString(); + final var topicFuture = context.getTopicFuture(); checkTopicNsOwnership(topic) .thenRun(() -> { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, - properties); + checkOwnershipAndCreatePersistentTopic(context); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1668,8 +1692,7 @@ public class BrokerService implements Closeable { return null; }); } else { - pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, - createIfMissing, topicFuture, properties)); + pendingTopicLoadingQueue.add(context); if (log.isDebugEnabled()) { log.debug("topic-loading for {} added into pending queue", topic); } @@ -1712,23 +1735,23 @@ public class BrokerService implements Closeable { } } - private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, - CompletableFuture<Optional<Topic>> topicFuture, - Map<String, String> properties) { - TopicName topicName = TopicName.get(topic); + private void checkOwnershipAndCreatePersistentTopic(TopicLoadingContext context) { + TopicName topicName = context.getTopicName(); + final var topic = topicName.toString(); + final var topicFuture = context.getTopicFuture(); checkTopicNsOwnership(topic).thenRun(() -> { CompletableFuture<Map<String, String>> propertiesFuture; - if (properties == null) { + if (context.getProperties() == null) { //Read properties from storage when loading topic. propertiesFuture = fetchTopicPropertiesAsync(topicName); } else { - propertiesFuture = CompletableFuture.completedFuture(properties); + propertiesFuture = CompletableFuture.completedFuture(context.getProperties()); } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic0(topic, createIfMissing, topicFuture, - finalProperties) - ).exceptionally(throwable -> { + propertiesFuture.thenAccept(finalProperties -> { + context.setProperties(finalProperties); + //TODO add topicName in properties? + createPersistentTopic0(context); + }).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(throwable); @@ -1742,11 +1765,11 @@ public class BrokerService implements Closeable { } @VisibleForTesting - public void createPersistentTopic0(final String topic, boolean createIfMissing, - CompletableFuture<Optional<Topic>> topicFuture, - Map<String, String> properties) { - TopicName topicName = TopicName.get(topic); - final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + public void createPersistentTopic0(TopicLoadingContext context) { + TopicName topicName = context.getTopicName(); + final var topic = topicName.toString(); + final var topicFuture = context.getTopicFuture(); + final var createIfMissing = context.isCreateIfMissing(); if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); @@ -1780,7 +1803,9 @@ public class BrokerService implements Closeable { new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors)); } managedLedgerConfig.setCreateIfMissing(createIfMissing); - managedLedgerConfig.setProperties(properties); + if (context.getProperties() != null) { + managedLedgerConfig.setProperties(context.getProperties()); + } String shadowSource = managedLedgerConfig.getShadowSource(); if (shadowSource != null) { managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding()); @@ -1825,10 +1850,11 @@ public class BrokerService implements Closeable { return persistentTopic.checkDeduplicationStatus(); }) .thenRun(() -> { - log.info("Created topic {} - dedup is {}", topic, - persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"); - long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - - topicCreateTimeMs; + long nowInNanos = System.nanoTime(); + long topicLoadLatencyMs = context.latencyMs(nowInNanos); + log.info("Created topic {} - dedup is {} (latency: {})", topic, + persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled", + context.latencyString(nowInNanos)); pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); if (!topicFuture.complete(Optional.of(persistentTopic))) { // Check create persistent topic timeout. @@ -3244,15 +3270,13 @@ public class BrokerService implements Closeable { return; } - final String topic = pendingTopic.getTopic(); + pendingTopic.polledFromQueue(); + final String topic = pendingTopic.getTopicName().toString(); checkTopicNsOwnership(topic).thenRun(() -> { CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getTopicFuture(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - checkOwnershipAndCreatePersistentTopic(topic, - pendingTopic.isCreateIfMissing(), - pendingFuture, - pendingTopic.getProperties()); + checkOwnershipAndCreatePersistentTopic(pendingTopic); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { @@ -3827,13 +3851,4 @@ public class BrokerService implements Closeable { public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { this.pulsarChannelInitFactory = factory; } - - @AllArgsConstructor - @Getter - private static class TopicLoadingContext { - private final String topic; - private final boolean createIfMissing; - private final CompletableFuture<Optional<Topic>> topicFuture; - private final Map<String, String> properties; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java new file mode 100644 index 00000000000..9e3ed230cd2 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import org.apache.pulsar.common.naming.TopicName; +import org.jspecify.annotations.Nullable; + +@RequiredArgsConstructor +public class TopicLoadingContext { + + private static final String EXAMPLE_LATENCY_OUTPUTS = "1234 ms (queued: 567)"; + + private final long startNs = System.nanoTime(); + @Getter + private final TopicName topicName; + @Getter + private final boolean createIfMissing; + @Getter + private final CompletableFuture<Optional<Topic>> topicFuture; + @Getter + @Setter + @Nullable private Map<String, String> properties; + private long polledFromQueueNs = -1L; + + public void polledFromQueue() { + polledFromQueueNs = System.nanoTime(); + } + + public long latencyMs(long nowInNanos) { + return TimeUnit.NANOSECONDS.toMillis(nowInNanos - startNs); + } + + public String latencyString(long nowInNanos) { + final var builder = new StringBuilder(EXAMPLE_LATENCY_OUTPUTS.length()); + builder.append(latencyMs(nowInNanos)); + builder.append(" ms"); + if (polledFromQueueNs >= 0) { + builder.append(" (queued: ").append(latencyMs(polledFromQueueNs)).append(")"); + } + return builder.toString(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 49f5d7c5c36..62dcc37f38e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1184,7 +1184,8 @@ public class BrokerServiceTest extends BrokerTestBase { // try to create topic which should fail as bundle is disable CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true, null); + .loadOrCreatePersistentTopic(new TopicLoadingContext(topic, true, + new CompletableFuture<>())); try { futureResult.get(); @@ -1227,8 +1228,8 @@ public class BrokerServiceTest extends BrokerTestBase { ArrayList<CompletableFuture<Optional<Topic>>> loadFutures = new ArrayList<>(); for (int i = 0; i < 10; i++) { // try to create topic which should fail as bundle is disable - CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName + "_" + i, false, null); + CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService().loadOrCreatePersistentTopic( + new TopicLoadingContext(TopicName.get(topicName + "_" + i), false, new CompletableFuture<>())); loadFutures.add(futureResult); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 93654db2c99..5a54b37a637 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.opentelemetry.api.common.Attributes; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -47,6 +46,7 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicLoadingContext; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; @@ -178,7 +178,8 @@ public class TopicTransactionBufferTest extends TransactionTestBase { .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), Mockito.eq(PersistentTopic.class)); - brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + brokerService.createPersistentTopic0(new TopicLoadingContext(TopicName.get(topic), true, + new CompletableFuture<>())); Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get();
