This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git

commit f68019ab9c2e0ee362b890e1a7a0a449afda3a7c
Author: Brian Nixon <[email protected]>
AuthorDate: Mon Jul 15 14:15:03 2019 +0200

    ZOOKEEPER-3459: Add admin command to display synced state of peer
    
    Author: Brian Nixon <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Norbert Kalmar 
<[email protected]>
    
    Closes #1012 from enixon/cmd-sync-state
    
    (cherry picked from commit cc900a3b05bc31a237753680c8b00dc5866df4b2)
---
 .../apache/zookeeper/server/admin/Commands.java    | 40 +++++++++++++++--
 .../apache/zookeeper/server/quorum/Follower.java   | 15 ++++---
 .../org/apache/zookeeper/server/quorum/Leader.java |  3 ++
 .../apache/zookeeper/server/quorum/Learner.java    | 10 +++--
 .../apache/zookeeper/server/quorum/Observer.java   |  5 ++-
 .../apache/zookeeper/server/quorum/QuorumPeer.java | 50 ++++++++++++++++++++++
 6 files changed, 110 insertions(+), 13 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 7f338091c..fc10f9da8 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -37,9 +37,8 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.persistence.SnapshotInfo;
-import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
-import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.quorum.*;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,6 +129,7 @@ public class Commands {
         registerCommand(new WatchCommand());
         registerCommand(new WatchesByPathCommand());
         registerCommand(new WatchSummaryCommand());
+        registerCommand(new ZabStateCommand());
     }
 
     /**
@@ -576,5 +576,39 @@ public class Commands {
         }
     }
 
+
+    /**
+     * Returns the current phase of Zab protocol that peer is running.
+     * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, 
BROADCAST
+     */
+    public static class ZabStateCommand extends CommandBase {
+        public ZabStateCommand() {
+            super(Arrays.asList("zabstate"));
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, 
String> kwargs) {
+            CommandResponse response = initializeResponse();
+            if (zkServer instanceof QuorumZooKeeperServer) {
+                QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
+                QuorumPeer.ZabState zabState = peer.getZabState();
+                QuorumVerifier qv = peer.getQuorumVerifier();
+
+                QuorumPeer.QuorumServer voter = 
qv.getVotingMembers().get(peer.getId());
+                boolean voting = (
+                        voter != null
+                                && voter.addr.equals(peer.getQuorumAddress())
+                                && 
voter.electionAddr.equals(peer.getElectionAddress())
+                );
+                response.put("voting", voting);
+                response.put("zabstate", zabState.name().toLowerCase());
+            } else {
+                response.put("voting", false);
+                response.put("zabstate", "");
+            }
+            return response ;
+        }
+    }
+
     private Commands() {}
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index b79f5702c..80b41ef5c 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -40,7 +40,7 @@ public class Follower extends Learner{
     private long lastQueued;
     // This is the same object as this.zk, but we cache the downcast op
     final FollowerZooKeeperServer fzk;
-    
+
     Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
         this.self = self;
         this.zk=zk;
@@ -72,7 +72,8 @@ public class Follower extends Learner{
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
         try {
-            QuorumServer leaderServer = findLeader();            
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
+            QuorumServer leaderServer = findLeader();
             try {
                 connectToLeader(leaderServer.addr, leaderServer.hostname);
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
@@ -86,7 +87,9 @@ public class Follower extends Learner{
                             + " is less than our accepted epoch " + 
ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                     throw new IOException("Error: Epoch of leader is lower");
                 }
-                syncWithLeader(newEpochZxid);                
+                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
+                syncWithLeader(newEpochZxid);
+                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning()) {
                     readPacket(qp);
@@ -114,7 +117,7 @@ public class Follower extends Learner{
         case Leader.PING:            
             ping(qp);            
             break;
-        case Leader.PROPOSAL:           
+        case Leader.PROPOSAL:
             TxnHeader hdr = new TxnHeader();
             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
             if (hdr.getZxid() != lastQueued + 1) {
@@ -146,9 +149,9 @@ public class Follower extends Learner{
            // get new designated leader from (current) leader's message
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
            long suggestedLeaderId = buffer.getLong();
-            boolean majorChange = 
+            boolean majorChange =
                    self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), 
true);
-           // commit (writes the new config to ZK tree (/zookeeper/config)     
                
+           // commit (writes the new config to ZK tree (/zookeeper/config)
            fzk.commit(qp.getZxid());
             if (majorChange) {
                throw new Exception("changes proposed in reconfig");
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index b3c4b6c5d..11039bfd9 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -469,6 +469,7 @@ public class Leader {
         zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             self.tick.set(0);
             zk.loadData();
 
@@ -539,6 +540,7 @@ public class Leader {
 
              waitForEpochAck(self.getId(), leaderStateSummary);
              self.setCurrentEpoch(epoch);
+             self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
 
              try {
                  waitForNewLeaderAck(self.getId(), zk.getZxid());
@@ -590,6 +592,7 @@ public class Leader {
                 self.setZooKeeperServer(zk);
             }
 
+            self.setZabState(QuorumPeer.ZabState.BROADCAST);
             self.adminServer.setZooKeeperServer(zk);
 
             // Everything is a go, simply start counting the ticks
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index a8d89b28d..74f9f4ff1 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -235,7 +235,7 @@ public class Learner {
 
     /**
      * Establish a connection with the Leader found by findLeader. Retries
-     * until either initLimit time has elapsed or 5 tries have happened. 
+     * until either initLimit time has elapsed or 5 tries have happened.
      * @param addr - the address of the Leader to connect to.
      * @throws IOException - if the socket connection fails on the 5th attempt
      * <li>if there is an authentication failure while connecting to 
leader</li>
@@ -309,7 +309,7 @@ public class Learner {
 
     /**
      * Once connected to the leader, perform the handshake protocol to
-     * establish a following / observing connection. 
+     * establish a following / observing connection.
      * @param pktType
      * @return the zxid the Leader sends for synchronization purposes.
      * @throws IOException
@@ -368,7 +368,7 @@ public class Learner {
     } 
     
     /**
-     * Finally, synchronize our history with the Leader. 
+     * Finally, synchronize our history with the Leader.
      * @param newLeaderZxid
      * @throws IOException
      * @throws InterruptedException
@@ -390,6 +390,7 @@ public class Learner {
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x{}", 
Long.toHexString(qp.getZxid()));
+                self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                 if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
                     LOG.info("Forcing a snapshot write as part of upgrading 
from an older Zookeeper. This should only happen while upgrading.");
                     snapshotNeeded = true;
@@ -399,6 +400,7 @@ public class Learner {
                 }
             }
             else if (qp.getType() == Leader.SNAP) {
+                self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                 LOG.info("Getting a snapshot from leader 0x" + 
Long.toHexString(qp.getZxid()));
                 // The leader is going to dump the database
                 // db is clear as part of deserializeSnapshot()
@@ -421,6 +423,7 @@ public class Learner {
                 syncSnapshot = true;
             } else if (qp.getType() == Leader.TRUNC) {
                 //we need to truncate the log to the lastzxid of the leader
+                self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                 LOG.warn("Truncating log to get in sync with the leader 0x"
                         + Long.toHexString(qp.getZxid()));
                 boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
@@ -591,6 +594,7 @@ public class Learner {
         }
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
+        self.setSyncMode(QuorumPeer.SyncMode.NONE);
         zk.startServing();
         /*
          * Update the election vote here to ensure that all members of the
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index 050582d62..6e1d9c150 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -63,6 +63,7 @@ public class Observer extends Learner{
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer leaderServer = findLeader();
             LOG.info("Observing " + leaderServer.addr);
             try {
@@ -70,8 +71,10 @@ public class Observer extends Learner{
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                 if (self.isReconfigStateChange())
                    throw new Exception("learned about role change");
- 
+
+                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                 syncWithLeader(newLeaderZxid);
+                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning()) {
                     readPacket(qp);
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 474f70c44..daf605cab 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -400,6 +400,22 @@ public class QuorumPeer extends ZooKeeperThread implements 
QuorumStats.Provider
         LOOKING, FOLLOWING, LEADING, OBSERVING;
     }
 
+    /**
+     * (Used for monitoring) shows the current phase of
+     * Zab protocol that peer is running.
+     */
+    public enum ZabState {
+        ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST;
+    }
+
+    /**
+     * (Used for monitoring) When peer is in synchronization phase, this shows
+     * which synchronization mechanism is being used
+     */
+    public enum SyncMode {
+        NONE, DIFF, SNAP, TRUNC;
+    }
+
     /*
      * A peer can either be participating, which implies that it is willing to
      * both vote in instances of consensus and to elect or become a Leader, or
@@ -715,11 +731,45 @@ public class QuorumPeer extends ZooKeeperThread 
implements QuorumStats.Provider
 
     private ServerState state = ServerState.LOOKING;
 
+    private AtomicReference<ZabState> zabState = new 
AtomicReference<>(ZabState.ELECTION);
+    private AtomicReference<SyncMode> syncMode = new 
AtomicReference<>(SyncMode.NONE);
+
     private boolean reconfigFlag = false; // indicates that a reconfig just 
committed
 
     public synchronized void setPeerState(ServerState newState){
         state=newState;
     }
+
+    public void setZabState(ZabState zabState) {
+        this.zabState.set(zabState);
+        LOG.info("Peer state changed: {}", getDetailedPeerState());
+    }
+
+    public void setSyncMode(SyncMode syncMode) {
+        this.syncMode.set(syncMode);
+        LOG.info("Peer state changed: {}", getDetailedPeerState());
+    }
+
+    public ZabState getZabState() {
+        return zabState.get();
+    }
+
+    public SyncMode getSyncMode() {
+        return syncMode.get();
+    }
+    public String getDetailedPeerState() {
+        final StringBuilder sb = new 
StringBuilder(getPeerState().toString().toLowerCase());
+        final ZabState zabState = getZabState();
+        if (!ZabState.ELECTION.equals(zabState)) {
+            sb.append(" - ").append(zabState.toString().toLowerCase());
+        }
+        final SyncMode syncMode = getSyncMode();
+        if (!SyncMode.NONE.equals(syncMode)) {
+            sb.append(" - ").append(syncMode.toString().toLowerCase());
+        }
+        return sb.toString();
+    }
+
     public synchronized void reconfigFlagSet(){
        reconfigFlag = true;
     }

Reply via email to