kfaraz commented on code in PR #16528:
URL: https://github.com/apache/druid/pull/16528#discussion_r1622740278
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -65,11 +63,10 @@ public CuratorDruidLeaderSelector(CuratorFramework curator,
@Self DruidNode self
this.curator = curator;
this.self = self;
this.latchPath = latchPath;
-
- // Creating a LeaderLatch here allows us to query for the current leader.
We will not be considered for leadership
- // election until LeaderLatch.start() is called in registerListener().
This allows clients to observe the current
- // leader without being involved in the election.
this.leaderLatch.set(createNewLeaderLatch());
+
+ // Adding ConnectionStateListener to handle session changes using a method
reference
Review Comment:
this comment is not needed, the code is self explanatory.
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch()
private LeaderLatch createNewLeaderLatchWithListener()
{
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
+ newLeaderLatch.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ try {
+ if (leader) {
+ log.warn("I'm being asked to become leader. But I am already the
leader. Ignored event.");
+ return;
+ }
- newLeaderLatch.addListener(
- new LeaderLatchListener()
- {
- @Override
- public void isLeader()
- {
- try {
- if (leader) {
- log.warn("I'm being asked to become leader. But I am already
the leader. Ignored event.");
- return;
- }
-
- leader = true;
- term++;
- listener.becomeLeader();
- }
- catch (Exception ex) {
- log.makeAlert(ex, "listener becomeLeader() failed. Unable to
become leader").emit();
-
- // give others a chance to become leader.
- CloseableUtils.closeAndSuppressExceptions(
- createNewLeaderLatchWithListener(),
- e -> log.warn("Could not close old leader latch; continuing
with new one anyway.")
- );
-
- leader = false;
- try {
- //Small delay before starting the latch so that others waiting
are chosen to become leader.
- Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
- leaderLatch.get().start();
- }
- catch (Exception e) {
- // If an exception gets thrown out here, then the node will
zombie out 'cause it won't be looking for
- // the latch anymore. I don't believe it's actually possible
for an Exception to throw out here, but
- // Curator likes to have "throws Exception" on methods so it
might happen...
- log.makeAlert(e, "I am a zombie").emit();
- }
- }
+ leader = true;
+ term++;
+ listener.becomeLeader();
+ }
+ catch (Exception ex) {
+ log.makeAlert(ex, "listener becomeLeader() failed. Unable to become
leader").emit();
+
+ // give others a chance to become leader.
+ CloseableUtils.closeAndSuppressExceptions(
+ createNewLeaderLatchWithListener(),
+ e -> log.warn("Could not close old leader latch; continuing with
new one anyway.")
+ );
+
+ leader = false;
+ try {
+ // Small delay before starting the latch so that others waiting
are chosen to become leader.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
+ leaderLatch.get().start();
+ }
+ catch (Exception e) {
+ // If an exception gets thrown out here, then the node will zombie
out 'cause it won't be looking for
+ // the latch anymore. I don't believe it's actually possible for
an Exception to throw out here, but
+ // Curator likes to have "throws Exception" on methods so it might
happen...
+ log.makeAlert(e, "I am a zombie").emit();
}
+ }
+ }
- @Override
- public void notLeader()
- {
- try {
- if (!leader) {
- log.warn("I'm being asked to stop being leader. But I am not
the leader. Ignored event.");
- return;
- }
-
- leader = false;
- listener.stopBeingLeader();
- }
- catch (Exception ex) {
- log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to
stopBeingLeader").emit();
- }
+ @Override
+ public void notLeader()
+ {
+ try {
+ if (!leader) {
+ log.warn("I'm being asked to stop being leader. But I am not the
leader. Ignored event.");
+ return;
}
- },
- listenerExecutor
- );
+
+ leader = false;
+ listener.stopBeingLeader();
+ }
+ catch (Exception ex) {
+ log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to
stopBeingLeader").emit();
+ }
+ }
+ }, listenerExecutor);
Review Comment:
This seems like only a formatting change, please revert this.
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch()
private LeaderLatch createNewLeaderLatchWithListener()
{
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
+ newLeaderLatch.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ try {
+ if (leader) {
+ log.warn("I'm being asked to become leader. But I am already the
leader. Ignored event.");
+ return;
+ }
- newLeaderLatch.addListener(
- new LeaderLatchListener()
- {
- @Override
- public void isLeader()
- {
- try {
- if (leader) {
- log.warn("I'm being asked to become leader. But I am already
the leader. Ignored event.");
- return;
- }
-
- leader = true;
- term++;
- listener.becomeLeader();
- }
- catch (Exception ex) {
- log.makeAlert(ex, "listener becomeLeader() failed. Unable to
become leader").emit();
-
- // give others a chance to become leader.
- CloseableUtils.closeAndSuppressExceptions(
- createNewLeaderLatchWithListener(),
- e -> log.warn("Could not close old leader latch; continuing
with new one anyway.")
- );
-
- leader = false;
- try {
- //Small delay before starting the latch so that others waiting
are chosen to become leader.
- Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
- leaderLatch.get().start();
- }
- catch (Exception e) {
- // If an exception gets thrown out here, then the node will
zombie out 'cause it won't be looking for
- // the latch anymore. I don't believe it's actually possible
for an Exception to throw out here, but
- // Curator likes to have "throws Exception" on methods so it
might happen...
- log.makeAlert(e, "I am a zombie").emit();
- }
- }
+ leader = true;
+ term++;
+ listener.becomeLeader();
+ }
+ catch (Exception ex) {
+ log.makeAlert(ex, "listener becomeLeader() failed. Unable to become
leader").emit();
+
+ // give others a chance to become leader.
+ CloseableUtils.closeAndSuppressExceptions(
+ createNewLeaderLatchWithListener(),
+ e -> log.warn("Could not close old leader latch; continuing with
new one anyway.")
+ );
+
+ leader = false;
+ try {
+ // Small delay before starting the latch so that others waiting
are chosen to become leader.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
+ leaderLatch.get().start();
+ }
+ catch (Exception e) {
+ // If an exception gets thrown out here, then the node will zombie
out 'cause it won't be looking for
+ // the latch anymore. I don't believe it's actually possible for
an Exception to throw out here, but
+ // Curator likes to have "throws Exception" on methods so it might
happen...
+ log.makeAlert(e, "I am a zombie").emit();
}
+ }
+ }
- @Override
- public void notLeader()
- {
- try {
- if (!leader) {
- log.warn("I'm being asked to stop being leader. But I am not
the leader. Ignored event.");
- return;
- }
-
- leader = false;
- listener.stopBeingLeader();
- }
- catch (Exception ex) {
- log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to
stopBeingLeader").emit();
- }
+ @Override
+ public void notLeader()
+ {
+ try {
+ if (!leader) {
Review Comment:
When connection is lost, the `notLeader()` method is also called. So I
wonder if we even need to have a state change listener or if we should just
recreate the latch in this method.
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch()
private LeaderLatch createNewLeaderLatchWithListener()
{
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
+ newLeaderLatch.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ try {
+ if (leader) {
Review Comment:
We need to handle the case where this method is called with
`newLeaderLatch.getState() == CLOSED`.
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -215,4 +208,37 @@ public void unregisterListener()
CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e ->
log.warn(e, "Failed to close LeaderLatch."));
listenerExecutor.shutdownNow();
}
+
+ // Method to handle connection state changes
+ private void handleConnectionStateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ switch (newState) {
+ case SUSPENDED:
+ case LOST:
+ recreateLeaderLatch();
+ break;
+ case RECONNECTED:
+ // Connection reestablished, no action needed here
+ break;
+ default:
+ // Do nothing for other states
+ break;
+ }
+ }
+
+ private void recreateLeaderLatch()
+ {
+ // Close existing leader latch
+ CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e ->
log.warn(e, "Failed to close LeaderLatch."));
+
+ // Create and start a new leader latch
+ LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener();
+ try {
+ newLeaderLatch.start();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException("Failed to start new LeaderLatch after
session change", ex);
+ }
+ leaderLatch.set(newLeaderLatch);
Review Comment:
Please take a look at these lines in the code:
https://github.com/apache/druid/blob/b53d75758fd5b8d30cbd3836bfa0a954f01ced84/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java#L103-L120
The new method `recreateLeaderLatch()` needs to do exactly this.
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -215,4 +208,37 @@ public void unregisterListener()
CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e ->
log.warn(e, "Failed to close LeaderLatch."));
listenerExecutor.shutdownNow();
}
+
+ // Method to handle connection state changes
+ private void handleConnectionStateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ switch (newState) {
+ case SUSPENDED:
+ case LOST:
+ recreateLeaderLatch();
+ break;
+ case RECONNECTED:
+ // Connection reestablished, no action needed here
+ break;
+ default:
+ // Do nothing for other states
+ break;
+ }
+ }
+
+ private void recreateLeaderLatch()
+ {
+ // Close existing leader latch
+ CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e ->
log.warn(e, "Failed to close LeaderLatch."));
+
+ // Create and start a new leader latch
+ LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener();
Review Comment:
The latch returned by `createNewLeaderLatchWithListener()` is actually the
old latch.
##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -215,4 +208,37 @@ public void unregisterListener()
CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e ->
log.warn(e, "Failed to close LeaderLatch."));
listenerExecutor.shutdownNow();
}
+
+ // Method to handle connection state changes
Review Comment:
```suggestion
/**
* Handles connection state changes. Recreates the leader latch if
connection to zookeeper is lost.
*/
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]