This is an automated email from the ASF dual-hosted git repository.
nkalmar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new cc900a3 ZOOKEEPER-3459: Add admin command to display synced state of
peer
cc900a3 is described below
commit cc900a3b05bc31a237753680c8b00dc5866df4b2
Author: Brian Nixon <[email protected]>
AuthorDate: Mon Jul 15 14:15:03 2019 +0200
ZOOKEEPER-3459: Add admin command to display synced state of peer
Author: Brian Nixon <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Norbert Kalmar
<[email protected]>
Closes #1012 from enixon/cmd-sync-state
---
.../apache/zookeeper/server/admin/Commands.java | 35 +++++++++++++++
.../apache/zookeeper/server/quorum/Follower.java | 3 ++
.../org/apache/zookeeper/server/quorum/Leader.java | 3 ++
.../apache/zookeeper/server/quorum/Learner.java | 4 ++
.../apache/zookeeper/server/quorum/Observer.java | 3 ++
.../apache/zookeeper/server/quorum/QuorumPeer.java | 52 ++++++++++++++++++++++
.../server/quorum/QuorumZooKeeperServer.java | 6 +++
7 files changed, 106 insertions(+)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 428910f..097d9b7 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -44,6 +44,7 @@ import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,6 +139,7 @@ public class Commands {
registerCommand(new WatchCommand());
registerCommand(new WatchesByPathCommand());
registerCommand(new WatchSummaryCommand());
+ registerCommand(new ZabStateCommand());
registerCommand(new SystemPropertiesCommand());
registerCommand(new InitialConfigurationCommand());
}
@@ -617,6 +619,39 @@ public class Commands {
}
/**
+ * Returns the current phase of Zab protocol that peer is running.
+ * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION,
BROADCAST
+ */
+ public static class ZabStateCommand extends CommandBase {
+ public ZabStateCommand() {
+ super(Arrays.asList("zabstate"), false);
+ }
+
+ @Override
+ public CommandResponse run(ZooKeeperServer zkServer, Map<String,
String> kwargs) {
+ CommandResponse response = initializeResponse();
+ if (zkServer instanceof QuorumZooKeeperServer) {
+ QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
+ QuorumPeer.ZabState zabState = peer.getZabState();
+ QuorumVerifier qv = peer.getQuorumVerifier();
+
+ QuorumPeer.QuorumServer voter =
qv.getVotingMembers().get(peer.getId());
+ boolean voting = (
+ voter != null
+ && voter.addr.equals(peer.getQuorumAddress())
+ &&
voter.electionAddr.equals(peer.getElectionAddress())
+ );
+ response.put("voting", voting);
+ response.put("zabstate", zabState.name().toLowerCase());
+ } else {
+ response.put("voting", false);
+ response.put("zabstate", "");
+ }
+ return response ;
+ }
+ }
+
+ /**
* All defined system properties.
*/
public static class SystemPropertiesCommand extends CommandBase {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index f3a8ffe..0eeac83 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -78,6 +78,7 @@ public class Follower extends Learner{
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
+ self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
@@ -95,7 +96,9 @@ public class Follower extends Learner{
long startTime = Time.currentElapsedTime();
try {
self.setLeaderAddressAndId(leaderServer.addr,
leaderServer.getId());
+ self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newEpochZxid);
+ self.setZabState(QuorumPeer.ZabState.BROADCAST);
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 052438b..a5a16f4 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -546,6 +546,7 @@ public class Leader implements LearnerMaster {
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
+ self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
zk.loadData();
@@ -616,6 +617,7 @@ public class Leader implements LearnerMaster {
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
+ self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
@@ -667,6 +669,7 @@ public class Leader implements LearnerMaster {
self.setZooKeeperServer(zk);
}
+ self.setZabState(QuorumPeer.ZabState.BROADCAST);
self.adminServer.setZooKeeperServer(zk);
// Everything is a go, simply start counting the ticks
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 168c44b..63a5454 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -409,9 +409,11 @@ public class Learner {
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}",
Long.toHexString(qp.getZxid()));
+ self.setSyncMode(QuorumPeer.SyncMode.DIFF);
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
+ self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x" +
Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// db is clear as part of deserializeSnapshot()
@@ -434,6 +436,7 @@ public class Learner {
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
+ self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
@@ -592,6 +595,7 @@ public class Learner {
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();
/*
* Update the election vote here to ensure that all members of the
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index 907aba8..3832592 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -105,6 +105,7 @@ public class Observer extends Learner{
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
try {
+ self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer master = findLearnerMaster();
try {
connectToLeader(master.addr, master.hostname);
@@ -114,7 +115,9 @@ public class Observer extends Learner{
}
self.setLeaderAddressAndId(master.addr, master.getId());
+ self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newLeaderZxid);
+ self.setZabState(QuorumPeer.ZabState.BROADCAST);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning() && nextLearnerMaster.get() == null) {
readPacket(qp);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 062f259..46f144b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -417,6 +417,22 @@ public class QuorumPeer extends ZooKeeperThread implements
QuorumStats.Provider
LOOKING, FOLLOWING, LEADING, OBSERVING;
}
+ /**
+ * (Used for monitoring) shows the current phase of
+ * Zab protocol that peer is running.
+ */
+ public enum ZabState {
+ ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST;
+ }
+
+ /**
+ * (Used for monitoring) When peer is in synchronization phase, this shows
+ * which synchronization mechanism is being used
+ */
+ public enum SyncMode {
+ NONE, DIFF, SNAP, TRUNC;
+ }
+
/*
* A peer can either be participating, which implies that it is willing to
* both vote in instances of consensus and to elect or become a Leader, or
@@ -754,6 +770,8 @@ public class QuorumPeer extends ZooKeeperThread implements
QuorumStats.Provider
private ServerState state = ServerState.LOOKING;
+ private AtomicReference<ZabState> zabState = new
AtomicReference<>(ZabState.ELECTION);
+ private AtomicReference<SyncMode> syncMode = new
AtomicReference<>(SyncMode.NONE);
private AtomicReference<String> leaderAddress = new
AtomicReference<String>("");
private AtomicLong leaderId = new AtomicLong(-1);
@@ -763,9 +781,30 @@ public class QuorumPeer extends ZooKeeperThread implements
QuorumStats.Provider
state = newState;
if (newState == ServerState.LOOKING) {
setLeaderAddressAndId(null, -1);
+ setZabState(ZabState.ELECTION);
+ } else {
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
}
}
+ public void setZabState(ZabState zabState) {
+ this.zabState.set(zabState);
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+
+ public void setSyncMode(SyncMode syncMode) {
+ this.syncMode.set(syncMode);
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+
+ public ZabState getZabState() {
+ return zabState.get();
+ }
+
+ public SyncMode getSyncMode() {
+ return syncMode.get();
+ }
+
public void setLeaderAddressAndId(InetSocketAddress addr, long newId) {
if (addr != null) {
leaderAddress.set(addr.getHostString());
@@ -783,6 +822,19 @@ public class QuorumPeer extends ZooKeeperThread implements
QuorumStats.Provider
return leaderId.get();
}
+ public String getDetailedPeerState() {
+ final StringBuilder sb = new
StringBuilder(getPeerState().toString().toLowerCase());
+ final ZabState zabState = getZabState();
+ if (!ZabState.ELECTION.equals(zabState)) {
+ sb.append(" - ").append(zabState.toString().toLowerCase());
+ }
+ final SyncMode syncMode = getSyncMode();
+ if (!SyncMode.NONE.equals(syncMode)) {
+ sb.append(" - ").append(syncMode.toString().toLowerCase());
+ }
+ return sb.toString();
+ }
+
public synchronized void reconfigFlagSet(){
reconfigFlag = true;
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index acd9181..7759a2c 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
+import java.util.function.BiConsumer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -215,4 +216,9 @@ public abstract class QuorumZooKeeperServer extends
ZooKeeperServer {
rootContext.unregisterGauge("quorum_size");
}
+ @Override
+ public void dumpMonitorValues(BiConsumer<String, Object> response) {
+ super.dumpMonitorValues(response);
+ response.accept("peer_state", self.getDetailedPeerState());
+ }
}