This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8c4549639 [improve][broker] Add logging to leader election (#22645)
7a8c4549639 is described below
commit 7a8c4549639d67182049bca9f714c0f4b3061236
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 3 20:16:08 2024 +0300
[improve][broker] Add logging to leader election (#22645)
---
.../org/apache/pulsar/broker/PulsarService.java | 6 +++---
.../coordination/impl/LeaderElectionImpl.java | 22 ++++++++++++++++++----
2 files changed, 21 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8c910fb91e1..559ca1e9e69 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1181,7 +1181,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
new LeaderElectionService(coordinationService, getBrokerId(),
getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
- LOG.info("This broker was elected leader");
+ LOG.info("This broker {} was elected leader",
getBrokerId());
if (getConfiguration().isLoadBalancerEnabled()) {
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
@@ -1202,10 +1202,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (leaderElectionService != null) {
final Optional<LeaderBroker> currentLeader =
leaderElectionService.getCurrentLeader();
if (currentLeader.isPresent()) {
- LOG.info("This broker is a follower. Current
leader is {}",
+ LOG.info("This broker {} is a follower.
Current leader is {}", getBrokerId(),
currentLeader);
} else {
- LOG.info("This broker is a follower. No leader
has been elected yet");
+ LOG.info("This broker {} is a follower. No
leader has been elected yet", getBrokerId());
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 9e6a9b94c42..aa606084173 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -129,19 +129,26 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
return FutureUtils.exception(t);
}
- if (existingValue.equals(proposedValue.orElse(null))) {
+ T value = proposedValue.orElse(null);
+ if (existingValue.equals(value)) {
// If the value is the same as our proposed value, it means this
instance was the leader at some
// point before. The existing value can either be for this same
session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
// The value is still valid because it was created in the same
session
changeState(LeaderElectionState.Leading);
} else {
+ log.info("Conditionally deleting existing equals value {} for
{} because it's not created in the "
+ + "current session. stat={}", existingValue, path,
res.getStat());
// Since the value was created in a different session, it
might be expiring. We need to delete it
// and try the election again.
return store.delete(path,
Optional.of(res.getStat().getVersion()))
.thenCompose(__ -> tryToBecomeLeader());
}
} else if (res.getStat().isCreatedBySelf()) {
+ log.warn("Conditionally deleting existing value {} for {} because
it's different from the proposed value "
+ + "({}). This is unexpected since it was created
within the same session. "
+ + "In tests this could happen because of an
invalid shared session id when using mocks.",
+ existingValue, path, value);
// The existing value is different but was created from the same
session
return store.delete(path, Optional.of(res.getStat().getVersion()))
.thenCompose(__ -> tryToBecomeLeader());
@@ -165,9 +172,10 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
}
private synchronized CompletableFuture<LeaderElectionState>
tryToBecomeLeader() {
+ T value = proposedValue.get();
byte[] payload;
try {
- payload = serde.serialize(path, proposedValue.get());
+ payload = serde.serialize(path, value);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
@@ -181,7 +189,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
cache.get(path)
.thenRun(() -> {
synchronized (LeaderElectionImpl.this)
{
- log.info("Acquired leadership on
{}", path);
+ log.info("Acquired leadership on
{} with {}", path, value);
internalState =
InternalState.LeaderIsPresent;
if (leaderElectionState !=
LeaderElectionState.Leading) {
leaderElectionState =
LeaderElectionState.Leading;
@@ -196,6 +204,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
}).exceptionally(ex -> {
// We fail to do the get(), so clean
up the leader election fail the whole
// operation
+ log.warn("Failed to get the current
state after acquiring leadership on {}. "
+ + " Conditionally deleting
current entry.", path, ex);
store.delete(path,
Optional.of(stat.getVersion()))
.thenRun(() ->
result.completeExceptionally(ex))
.exceptionally(ex2 -> {
@@ -205,6 +215,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
return null;
});
} else {
+ log.info("Leadership on {} with value {} was lost.
"
+ + "Conditionally deleting entry
with stat={}.", path, value, stat);
// LeaderElection was closed in between. Release
the lock asynchronously
store.delete(path, Optional.of(stat.getVersion()))
.thenRun(() ->
result.completeExceptionally(
@@ -219,7 +231,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
if (ex.getCause() instanceof BadVersionException) {
// There was a conflict between 2 participants trying
to become leaders at same time. Retry
// to fetch info on new leader.
-
+ log.info("There was a conflict between 2 participants
trying to become leaders at the same "
+ + "time on {}. Attempted with value
{}. Retrying.",
+ path, value);
elect()
.thenAccept(lse -> result.complete(lse))
.exceptionally(ex2 -> {