This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8c4549639 [improve][broker] Add logging to leader election (#22645)
7a8c4549639 is described below

commit 7a8c4549639d67182049bca9f714c0f4b3061236
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 3 20:16:08 2024 +0300

    [improve][broker] Add logging to leader election (#22645)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  6 +++---
 .../coordination/impl/LeaderElectionImpl.java      | 22 ++++++++++++++++++----
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8c910fb91e1..559ca1e9e69 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1181,7 +1181,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 new LeaderElectionService(coordinationService, getBrokerId(), 
getSafeWebServiceAddress(),
                 state -> {
                     if (state == LeaderElectionState.Leading) {
-                        LOG.info("This broker was elected leader");
+                        LOG.info("This broker {} was elected leader", 
getBrokerId());
                         if (getConfiguration().isLoadBalancerEnabled()) {
                             long resourceQuotaUpdateInterval = TimeUnit.MINUTES
                                     
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
@@ -1202,10 +1202,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         if (leaderElectionService != null) {
                             final Optional<LeaderBroker> currentLeader = 
leaderElectionService.getCurrentLeader();
                             if (currentLeader.isPresent()) {
-                                LOG.info("This broker is a follower. Current 
leader is {}",
+                                LOG.info("This broker {} is a follower. 
Current leader is {}", getBrokerId(),
                                         currentLeader);
                             } else {
-                                LOG.info("This broker is a follower. No leader 
has been elected yet");
+                                LOG.info("This broker {} is a follower. No 
leader has been elected yet", getBrokerId());
                             }
 
                         }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 9e6a9b94c42..aa606084173 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -129,19 +129,26 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
             return FutureUtils.exception(t);
         }
 
-        if (existingValue.equals(proposedValue.orElse(null))) {
+        T value = proposedValue.orElse(null);
+        if (existingValue.equals(value)) {
             // If the value is the same as our proposed value, it means this 
instance was the leader at some
             // point before. The existing value can either be for this same 
session or for a previous one.
             if (res.getStat().isCreatedBySelf()) {
                 // The value is still valid because it was created in the same 
session
                 changeState(LeaderElectionState.Leading);
             } else {
+                log.info("Conditionally deleting existing equals value {} for 
{} because it's not created in the "
+                        + "current session. stat={}", existingValue, path, 
res.getStat());
                 // Since the value was created in a different session, it 
might be expiring. We need to delete it
                 // and try the election again.
                 return store.delete(path, 
Optional.of(res.getStat().getVersion()))
                         .thenCompose(__ -> tryToBecomeLeader());
             }
         } else if (res.getStat().isCreatedBySelf()) {
+            log.warn("Conditionally deleting existing value {} for {} because 
it's different from the proposed value "
+                            + "({}). This is unexpected since it was created 
within the same session. "
+                            + "In tests this could happen because of an 
invalid shared session id when using mocks.",
+                    existingValue, path, value);
             // The existing value is different but was created from the same 
session
             return store.delete(path, Optional.of(res.getStat().getVersion()))
                     .thenCompose(__ -> tryToBecomeLeader());
@@ -165,9 +172,10 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
     }
 
     private synchronized CompletableFuture<LeaderElectionState> 
tryToBecomeLeader() {
+        T value = proposedValue.get();
         byte[] payload;
         try {
-            payload = serde.serialize(path, proposedValue.get());
+            payload = serde.serialize(path, value);
         } catch (Throwable t) {
             return FutureUtils.exception(t);
         }
@@ -181,7 +189,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                             cache.get(path)
                                     .thenRun(() -> {
                                         synchronized (LeaderElectionImpl.this) 
{
-                                            log.info("Acquired leadership on 
{}", path);
+                                            log.info("Acquired leadership on 
{} with {}", path, value);
                                             internalState = 
InternalState.LeaderIsPresent;
                                             if (leaderElectionState != 
LeaderElectionState.Leading) {
                                                 leaderElectionState = 
LeaderElectionState.Leading;
@@ -196,6 +204,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                                     }).exceptionally(ex -> {
                                         // We fail to do the get(), so clean 
up the leader election fail the whole
                                         // operation
+                                        log.warn("Failed to get the current 
state after acquiring leadership on {}. "
+                                                + " Conditionally deleting 
current entry.", path, ex);
                                         store.delete(path, 
Optional.of(stat.getVersion()))
                                                 .thenRun(() -> 
result.completeExceptionally(ex))
                                                 .exceptionally(ex2 -> {
@@ -205,6 +215,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                                         return null;
                                     });
                         } else {
+                            log.info("Leadership on {} with value {} was lost. 
"
+                                            + "Conditionally deleting entry 
with stat={}.", path, value, stat);
                             // LeaderElection was closed in between. Release 
the lock asynchronously
                             store.delete(path, Optional.of(stat.getVersion()))
                                     .thenRun(() -> 
result.completeExceptionally(
@@ -219,7 +231,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                     if (ex.getCause() instanceof BadVersionException) {
                         // There was a conflict between 2 participants trying 
to become leaders at same time. Retry
                         // to fetch info on new leader.
-
+                        log.info("There was a conflict between 2 participants 
trying to become leaders at the same "
+                                        + "time on {}. Attempted with value 
{}. Retrying.",
+                                path, value);
                         elect()
                             .thenAccept(lse -> result.complete(lse))
                             .exceptionally(ex2 -> {

Reply via email to