This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 39ea19e41d6 [fix][broker] Fix various error-prone detected errors
mainly in logging and String.format parameters (#25059)
39ea19e41d6 is described below
commit 39ea19e41d6cfaa3049b7beca4480775d20d097e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 10 16:38:50 2025 +0200
[fix][broker] Fix various error-prone detected errors mainly in logging and
String.format parameters (#25059)
---
.../main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 2 +-
.../broker/loadbalance/extensions/scheduler/SplitScheduler.java | 2 +-
.../strategy/DefaultNamespaceBundleSplitStrategyImpl.java | 2 +-
.../pulsar/broker/service/PulsarMetadataEventSynchronizer.java | 8 +++++---
.../pulsar/broker/service/persistent/GeoPersistentReplicator.java | 2 +-
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 ++---
.../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 3 +--
7 files changed, 12 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 5632de07f47..04d16499081 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1166,7 +1166,7 @@ public class Namespaces extends NamespacesBase {
internalDeleteSubscriptionDispatchRateAsync()
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- log.error("Failed to delete the subscription dispatchRate
for cluster on namespace {}",
+ log.error("[{}] Failed to delete the subscription
dispatchRate for cluster on namespace {}",
clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
index 816fde0038a..70887ab2325 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
@@ -116,7 +116,7 @@ public class SplitScheduler implements LoadManagerScheduler
{
synchronized (bundleSplitStrategy) {
final Set<SplitDecision> decisions =
bundleSplitStrategy.findBundlesToSplit(context, pulsar);
if (debugMode) {
- log.info("Split Decisions:", decisions);
+ log.info("Split Decisions: {}", decisions);
}
if (!decisions.isEmpty()) {
// currently following the unloading timeout
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
index 7875c07b122..6cc4293143e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -244,7 +244,7 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
.get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
} catch (Throwable e) {
counter.update(Failure, Unknown);
- log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to
get split boundaries.", bundle, e));
+ log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to
get split boundaries.", bundle), e);
continue;
}
if (splitBoundary == null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index a9564642c1a..a5fac333ae5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -228,11 +228,13 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
log.info("successfully created consumer {}", topicName);
} else {
State stateTransient = state;
- log.info("[{}] Closing the new consumer because the
synchronizer state is {}", stateTransient);
+ log.info("[{}] Closing the new consumer because the
synchronizer state is {}", topicName,
+ stateTransient);
CompletableFuture closeConsumer = new CompletableFuture<>();
closeResource(() -> consumer.closeAsync(), closeConsumer);
closeConsumer.thenRun(() -> {
- log.info("[{}] Closed the new consumer because the
synchronizer state is {}", stateTransient);
+ log.info("[{}] Closed the new consumer because the
synchronizer state is {}", topicName,
+ stateTransient);
});
}
}).exceptionally(ex -> {
@@ -317,7 +319,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
}
// Retry.
long waitTimeMs = backOff.next();
- log.warn("[{}] Exception: '{}' occurred while trying to close the
%s. Retrying again in {} s.",
+ log.warn("[{}] Exception: '{}' occurred while trying to close the
{}. Retrying again in {} s.",
topicName, ex.getMessage(),
asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex);
brokerService.executor().schedule(() ->
closeResource(asyncCloseable, future), waitTimeMs,
TimeUnit.MILLISECONDS);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 437067edb69..46f8a27d580 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -65,7 +65,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
if (metadata.partitions == 0) {
topicCheckFuture.complete(null);
} else {
- String errorMsg = String.format("{} Can not create the
replicator due to the partitions in the"
+ String errorMsg = String.format("%s Can not create the
replicator due to the partitions in the"
+ " remote cluster is not 0, but is
%s",
replicatorId, metadata.partitions);
log.error(errorMsg);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 86ac74e8f17..422916602c8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2913,13 +2913,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.topicCreationTimeStamp = getTopicCreationTimeStamp();
stats.compaction.reset();
- mxBean.flatMap(bean ->
bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
+ mxBean.flatMap(bean ->
bean.getCompactionRecordForTopic(topic)).ifPresent(compactionRecord -> {
stats.compaction.lastCompactionRemovedEventCount =
compactionRecord.getLastCompactionRemovedEventCount();
stats.compaction.lastCompactionSucceedTimestamp =
compactionRecord.getLastCompactionSucceedTimestamp();
stats.compaction.lastCompactionFailedTimestamp =
compactionRecord.getLastCompactionFailedTimestamp();
stats.compaction.lastCompactionDurationTimeInMills =
compactionRecord.getLastCompactionDurationTimeInMills();
- return compactionRecord;
});
Map<String, CompletableFuture<SubscriptionStatsImpl>>
subscriptionFutures = new HashMap<>();
@@ -2967,7 +2966,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
if (getStatsOptions.isGetEarliestTimeInBacklog() &&
stats.backlogSize != 0) {
- CompletableFuture finalRes =
ledger.getEarliestMessagePublishTimeInBacklog()
+ CompletableFuture<TopicStatsImpl> finalRes =
ledger.getEarliestMessagePublishTimeInBacklog()
.thenApply((earliestTime) -> {
stats.earliestMsgPublishTimeInBacklogs = earliestTime;
return stats;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 1736cd3840d..1531672bef1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -326,7 +326,7 @@ public class NamespaceStatsAggregator {
compactorMXBean
.flatMap(mxBean ->
mxBean.getCompactionRecordForTopic(topic.getName()))
- .map(compactionRecord -> {
+ .ifPresent(compactionRecord -> {
stats.compactionRemovedEventCount =
compactionRecord.getCompactionRemovedEventCount();
stats.compactionSucceedCount =
compactionRecord.getCompactionSucceedCount();
stats.compactionFailedCount =
compactionRecord.getCompactionFailedCount();
@@ -346,7 +346,6 @@ public class NamespaceStatsAggregator {
stats.compactionCompactedEntriesCount = entries;
stats.compactionCompactedEntriesSize = size;
}
- return compactionRecord;
});
}