This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0e5c0ccfdb550160072a343c06ec2a149450ff16 Author: Zike Yang <[email protected]> AuthorDate: Thu Feb 6 21:03:10 2025 +0800 [improve][broker] Avoid logging errors when there is a connection issue during subscription. (#23939) (cherry picked from commit 5e5d514174fdbc1b400df51fafaa18110f1c31a9) --- .../org/apache/pulsar/broker/service/BrokerServiceException.java | 6 ++++++ .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 3 ++- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 ++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 6abe40f811d..f54746180e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -224,6 +224,12 @@ public class BrokerServiceException extends Exception { } } + public static class ConnectionClosedException extends BrokerServiceException { + public ConnectionClosedException(String msg) { + super(msg); + } + } + public static class TopicBacklogQuotaExceededException extends BrokerServiceException { @Getter private final BacklogQuota.RetentionPolicy retentionPolicy; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 95df15956a4..5b2bf94e328 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -362,7 +362,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol consumer.consumerName(), currentUsageCount()); } future.completeExceptionally( - new BrokerServiceException("Connection was closed while the opening the cursor ")); + new BrokerServiceException.ConnectionClosedException( + "Connection was closed while the opening the cursor ")); } else { log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); future.complete(consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3cd200356c8..c978ffa3ff8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1064,7 +1064,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal decrementUsageCount(); return FutureUtil.failedFuture( - new BrokerServiceException("Connection was closed while the opening the cursor ")); + new BrokerServiceException.ConnectionClosedException( + "Connection was closed while the opening the cursor ")); } else { checkReplicatedSubscriptionControllerState(); if (log.isDebugEnabled()) { @@ -1101,6 +1102,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName, ex.getMessage()); close(); + } else if (ex.getCause() instanceof BrokerServiceException.ConnectionClosedException) { + log.warn("[{}][{}] Connection was closed while the opening the cursor", topic, subscriptionName); } else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); }
