szetszwo commented on code in PR #808:
URL: https://github.com/apache/ratis/pull/808#discussion_r1112301101
##########
ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java:
##########
@@ -55,54 +56,74 @@ public int run(CommandLine cl) throws IOException {
super.run(cl);
String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
+ // TODO: Default timeout should be set to 0, which means let server decide
(based on election timeout).
+ // However, occasionally the request could timeout too fast while
the transfer is in progress.
+ // i.e. request timeout doesn't mean transfer leadership has failed.
+ // Currently, Ratis shell returns merely based on the result of the
request.
+ // So we set a larger default timeout here (3s).
+ final long timeoutDefault = 3_000L;
+ // Default timeout for legacy mode matches with the legacy command
(version 2.4.x and older).
+ final long timeoutLegacy = 60_000L;
+ final Optional<Long> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ?
Optional.empty() :
+ Optional.of(Long.parseLong(cl.getOptionValue(TIMEOUT_OPTION_NAME)) *
1000L);
Review Comment:
Use `TimeDuration` for supporting different units such as 100ms, 1min.
```java
final TimeDuration timeoutDefault = TimeDuration.valueOf(3,
TimeUnit.SECONDS);
// Default timeout for legacy mode matches with the legacy command
(version 2.4.x and older).
final TimeDuration timeoutLegacy = TimeDuration.valueOf(60,
TimeUnit.SECONDS);
final Optional<TimeDuration> timeout =
!cl.hasOption(TIMEOUT_OPTION_NAME) ? Optional.empty() :
Optional.of(TimeDuration.valueOf(cl.getOptionValue(TIMEOUT_OPTION_NAME),
TimeUnit.SECONDS));
```
##########
ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java:
##########
@@ -55,54 +56,74 @@ public int run(CommandLine cl) throws IOException {
super.run(cl);
String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
+ // TODO: Default timeout should be set to 0, which means let server decide
(based on election timeout).
+ // However, occasionally the request could timeout too fast while
the transfer is in progress.
+ // i.e. request timeout doesn't mean transfer leadership has failed.
+ // Currently, Ratis shell returns merely based on the result of the
request.
+ // So we set a larger default timeout here (3s).
+ final long timeoutDefault = 3_000L;
+ // Default timeout for legacy mode matches with the legacy command
(version 2.4.x and older).
+ final long timeoutLegacy = 60_000L;
+ final Optional<Long> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ?
Optional.empty() :
+ Optional.of(Long.parseLong(cl.getOptionValue(TIMEOUT_OPTION_NAME)) *
1000L);
- RaftPeerId newLeaderId = null;
- // update priorities to enable transfer
- List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
- for (RaftPeer peer : getRaftGroup().getPeers()) {
- peersWithNewPriorities.add(
- RaftPeer.newBuilder(peer)
- .setPriority(peer.getAddress().equals(strAddr) ? 2 : 1)
- .build()
- );
- if (peer.getAddress().equals(strAddr)) {
- newLeaderId = peer.getId();
- }
- }
- if (newLeaderId == null) {
+ final int highestPriority = getRaftGroup().getPeers().stream()
+ .mapToInt(RaftPeer::getPriority).max().orElse(0);
+ RaftPeer newLeader = getRaftGroup().getPeers().stream()
+ .filter(peer ->
peer.getAddress().equals(strAddr)).findAny().orElse(null);
+ if (newLeader == null) {
Review Comment:
Print an error message.
```java
printf("Peer with address %s not found.", strAddr);
```
##########
ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java:
##########
@@ -55,54 +56,74 @@ public int run(CommandLine cl) throws IOException {
super.run(cl);
String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
+ // TODO: Default timeout should be set to 0, which means let server decide
(based on election timeout).
+ // However, occasionally the request could timeout too fast while
the transfer is in progress.
+ // i.e. request timeout doesn't mean transfer leadership has failed.
+ // Currently, Ratis shell returns merely based on the result of the
request.
+ // So we set a larger default timeout here (3s).
+ final long timeoutDefault = 3_000L;
+ // Default timeout for legacy mode matches with the legacy command
(version 2.4.x and older).
+ final long timeoutLegacy = 60_000L;
+ final Optional<Long> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ?
Optional.empty() :
+ Optional.of(Long.parseLong(cl.getOptionValue(TIMEOUT_OPTION_NAME)) *
1000L);
- RaftPeerId newLeaderId = null;
- // update priorities to enable transfer
- List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
- for (RaftPeer peer : getRaftGroup().getPeers()) {
- peersWithNewPriorities.add(
- RaftPeer.newBuilder(peer)
- .setPriority(peer.getAddress().equals(strAddr) ? 2 : 1)
- .build()
- );
- if (peer.getAddress().equals(strAddr)) {
- newLeaderId = peer.getId();
- }
- }
- if (newLeaderId == null) {
+ final int highestPriority = getRaftGroup().getPeers().stream()
+ .mapToInt(RaftPeer::getPriority).max().orElse(0);
+ RaftPeer newLeader = getRaftGroup().getPeers().stream()
+ .filter(peer ->
peer.getAddress().equals(strAddr)).findAny().orElse(null);
+ if (newLeader == null) {
return -2;
}
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
- String stringPeers = "[" +
peersWithNewPriorities.stream().map(RaftPeer::toString)
- .collect(Collectors.joining(", ")) + "]";
- printf("Applying new peer state before transferring leadership: %n%s%n",
stringPeers);
- RaftClientReply setConfigurationReply =
- client.admin().setConfiguration(peersWithNewPriorities);
- processReply(setConfigurationReply,
- () -> "failed to set priorities before initiating election");
// transfer leadership
- printf("Transferring leadership to server with address <%s> %n",
strAddr);
- try {
- Thread.sleep(3_000);
- RaftClientReply transferLeadershipReply =
- client.admin().transferLeadership(newLeaderId, 60_000);
- processReply(transferLeadershipReply, () -> "election failed");
- } catch (Throwable t) {
- printf("caught an error when executing transfer: %s%n",
t.getMessage());
+ Throwable err = tryTransfer(client, newLeader, highestPriority,
timeout.orElse(timeoutDefault));
+ if (err instanceof TransferLeadershipException
+ && err.getMessage().contains("it does not has highest priority")) {
+ // legacy mode, transfer leadership by setting priority.
+ err = tryTransfer(client, newLeader, highestPriority + 1,
timeout.orElse(timeoutLegacy));
+ }
+ if (err != null) {
return -1;
}
Review Comment:
After changing `tryTransfer` to return a boolean, we could make the
corresponding change as below:
```java
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
// transfer leadership
if (!tryTransfer(client, newLeader, highestPriority,
timeout.orElse(timeoutDefault))) {
// legacy mode, transfer leadership by setting priority.
tryTransfer(client, newLeader, highestPriority + 1,
timeout.orElse(timeoutLegacy));
}
} catch(Throwable t) {
printf("Failed to transfer peer %s with address %s: ",
newLeader.getId(), newLeader.getAddress());
t.printStackTrace(getPrintStream());
return -1;
}
return 0;
```
##########
ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java:
##########
@@ -55,54 +56,74 @@ public int run(CommandLine cl) throws IOException {
super.run(cl);
String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
+ // TODO: Default timeout should be set to 0, which means let server decide
(based on election timeout).
+ // However, occasionally the request could timeout too fast while
the transfer is in progress.
+ // i.e. request timeout doesn't mean transfer leadership has failed.
+ // Currently, Ratis shell returns merely based on the result of the
request.
+ // So we set a larger default timeout here (3s).
+ final long timeoutDefault = 3_000L;
+ // Default timeout for legacy mode matches with the legacy command
(version 2.4.x and older).
+ final long timeoutLegacy = 60_000L;
+ final Optional<Long> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ?
Optional.empty() :
+ Optional.of(Long.parseLong(cl.getOptionValue(TIMEOUT_OPTION_NAME)) *
1000L);
- RaftPeerId newLeaderId = null;
- // update priorities to enable transfer
- List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
- for (RaftPeer peer : getRaftGroup().getPeers()) {
- peersWithNewPriorities.add(
- RaftPeer.newBuilder(peer)
- .setPriority(peer.getAddress().equals(strAddr) ? 2 : 1)
- .build()
- );
- if (peer.getAddress().equals(strAddr)) {
- newLeaderId = peer.getId();
- }
- }
- if (newLeaderId == null) {
+ final int highestPriority = getRaftGroup().getPeers().stream()
+ .mapToInt(RaftPeer::getPriority).max().orElse(0);
+ RaftPeer newLeader = getRaftGroup().getPeers().stream()
+ .filter(peer ->
peer.getAddress().equals(strAddr)).findAny().orElse(null);
+ if (newLeader == null) {
return -2;
}
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
- String stringPeers = "[" +
peersWithNewPriorities.stream().map(RaftPeer::toString)
- .collect(Collectors.joining(", ")) + "]";
- printf("Applying new peer state before transferring leadership: %n%s%n",
stringPeers);
- RaftClientReply setConfigurationReply =
- client.admin().setConfiguration(peersWithNewPriorities);
- processReply(setConfigurationReply,
- () -> "failed to set priorities before initiating election");
// transfer leadership
- printf("Transferring leadership to server with address <%s> %n",
strAddr);
- try {
- Thread.sleep(3_000);
- RaftClientReply transferLeadershipReply =
- client.admin().transferLeadership(newLeaderId, 60_000);
- processReply(transferLeadershipReply, () -> "election failed");
- } catch (Throwable t) {
- printf("caught an error when executing transfer: %s%n",
t.getMessage());
+ Throwable err = tryTransfer(client, newLeader, highestPriority,
timeout.orElse(timeoutDefault));
+ if (err instanceof TransferLeadershipException
+ && err.getMessage().contains("it does not has highest priority")) {
+ // legacy mode, transfer leadership by setting priority.
+ err = tryTransfer(client, newLeader, highestPriority + 1,
timeout.orElse(timeoutLegacy));
+ }
+ if (err != null) {
return -1;
}
- println("Transferring leadership initiated");
}
return 0;
}
+ private Throwable tryTransfer(RaftClient client, RaftPeer newLeader, int
highestPriority, long timeout) {
+ printf("Transferring leadership to server with address <%s> %n",
newLeader.getAddress());
+ try {
+ // lift the current leader to the highest priority,
+ if (newLeader.getPriority() < highestPriority) {
+ setPriority(client, newLeader.getAddress(), highestPriority);
+ }
+ RaftClientReply transferLeadershipReply =
+ client.admin().transferLeadership(newLeader.getId(), timeout);
+ processReply(transferLeadershipReply, () -> "election failed");
+ } catch (Throwable t) {
+ printf("caught an error when executing transfer: %s%n", t.getMessage());
+ return t;
+ }
+ println("Transferring leadership initiated");
+ return null;
+ }
Review Comment:
Let's return a boolean to indicate if the transfer is success.
```java
private boolean tryTransfer(RaftClient client, RaftPeer newLeader, int
highestPriority,
TimeDuration timeout) throws IOException {
printf("Transferring leadership to server with address <%s> %n",
newLeader.getAddress());
try {
// lift the current leader to the highest priority,
if (newLeader.getPriority() < highestPriority) {
setPriority(client, newLeader.getAddress(), highestPriority);
}
RaftClientReply transferLeadershipReply =
client.admin().transferLeadership(newLeader.getId(),
timeout.toLong(TimeUnit.MILLISECONDS));
processReply(transferLeadershipReply, () -> "election failed");
} catch (TransferLeadershipException tle) {
if (tle.getMessage().contains("it does not has highest priority")) {
return false;
}
throw tle;
}
println("Transferring leadership initiated");
return true;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]