Repository: zookeeper Updated Branches: refs/heads/branch-3.4 2019d29f9 -> 3a5381499
ZOOKEEPER-2762: Cleanup findbug warnings in branch-3.4: Multithreaded correctness Warnings Author: Abraham Fine <[email protected]> Reviewers: Rakesh Radhakrishnan <[email protected]> Closes #239 from afine/ZOOKEEPER-2762 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/3a538149 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/3a538149 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/3a538149 Branch: refs/heads/branch-3.4 Commit: 3a5381499761992e1023b27748b18e280d7ebac2 Parents: 2019d29 Author: Abraham Fine <[email protected]> Authored: Wed May 24 14:33:40 2017 -0700 Committer: Rakesh Radhakrishnan <[email protected]> Committed: Wed May 24 14:33:40 2017 -0700 ---------------------------------------------------------------------- .../server/quorum/FastLeaderElection.java | 35 ++++++++++---------- .../apache/zookeeper/server/quorum/Leader.java | 6 ++-- .../zookeeper/server/quorum/LearnerHandler.java | 6 ++-- .../server/quorum/QuorumCnxManager.java | 28 ++++------------ .../zookeeper/server/quorum/QuorumPeer.java | 5 +-- src/java/test/config/findbugsExcludeFile.xml | 3 ++ .../server/quorum/FLEDontCareTest.java | 2 +- 7 files changed, 38 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java index 066f385..67e9267 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.jmx.MBeanRegistry; @@ -261,7 +262,7 @@ public class FastLeaderElection implements Election { ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), - logicalclock, + logicalclock.get(), self.getPeerState(), response.sid, current.getPeerEpoch()); @@ -348,12 +349,12 @@ public class FastLeaderElection implements Election { * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) - && (n.electionEpoch < logicalclock)){ + && (n.electionEpoch < logicalclock.get())){ Vote v = getVote(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), - logicalclock, + logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch()); @@ -499,7 +500,7 @@ public class FastLeaderElection implements Election { QuorumPeer self; Messenger messenger; - volatile long logicalclock; /* Election instance */ + AtomicLong logicalclock = new AtomicLong(); /* Election instance */ long proposedLeader; long proposedZxid; long proposedEpoch; @@ -509,7 +510,7 @@ public class FastLeaderElection implements Election { * Returns the current vlue of the logical clock counter */ public long getLogicalClock(){ - return logicalclock; + return logicalclock.get(); } /** @@ -582,13 +583,13 @@ public class FastLeaderElection implements Election { ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, - logicalclock, + logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + - Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) + + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } @@ -684,7 +685,7 @@ public class FastLeaderElection implements Election { if(leader != self.getId()){ if(votes.get(leader) == null) predicate = false; else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; - } else if(logicalclock != electionEpoch) { + } else if(logicalclock.get() != electionEpoch) { predicate = false; } @@ -812,7 +813,7 @@ public class FastLeaderElection implements Election { int notTimeout = finalizeWait; synchronized(this){ - logicalclock++; + logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } @@ -860,8 +861,8 @@ public class FastLeaderElection implements Election { switch (n.state) { case LOOKING: // If notification > current, replace and send messages out - if (n.electionEpoch > logicalclock) { - logicalclock = n.electionEpoch; + if (n.electionEpoch > logicalclock.get()) { + logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { @@ -872,11 +873,11 @@ public class FastLeaderElection implements Election { getPeerEpoch()); } sendNotifications(); - } else if (n.electionEpoch < logicalclock) { + } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) - + ", logicalclock=0x" + Long.toHexString(logicalclock)); + + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, @@ -896,7 +897,7 @@ public class FastLeaderElection implements Election { if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, - logicalclock, proposedEpoch))) { + logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, @@ -918,7 +919,7 @@ public class FastLeaderElection implements Election { Vote endVote = new Vote(proposedLeader, proposedZxid, - logicalclock, + logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; @@ -934,7 +935,7 @@ public class FastLeaderElection implements Election { * Consider all notifications from the same epoch * together. */ - if(n.electionEpoch == logicalclock){ + if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, @@ -966,7 +967,7 @@ public class FastLeaderElection implements Election { if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ - logicalclock = n.electionEpoch; + logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/main/org/apache/zookeeper/server/quorum/Leader.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java index bd7bf35..a9fd8d0 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java @@ -376,7 +376,7 @@ public class Leader { zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean); try { - self.tick = 0; + self.tick.set(0); zk.loadData(); leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); @@ -424,7 +424,7 @@ public class Leader { + "Perhaps the initTicks need to be increased."); } Thread.sleep(self.tickTime); - self.tick++; + self.tick.incrementAndGet(); return; } @@ -465,7 +465,7 @@ public class Leader { while (true) { Thread.sleep(self.tickTime / 2); if (!tickSkip) { - self.tick++; + self.tick.incrementAndGet(); } HashSet<Long> syncedSet = new HashSet<Long>(); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java index 4820490..884cc63 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -314,7 +314,7 @@ public class LearnerHandler extends ZooKeeperThread { public void run() { try { leader.addLearnerHandler(this); - tickOfNextAckDeadline = leader.self.tick + tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit; ia = BinaryInputArchive.getArchive(bufferedInput); @@ -565,7 +565,7 @@ public class LearnerHandler extends ZooKeeperThread { if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); } - tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit; + tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit; ByteBuffer bb; @@ -710,6 +710,6 @@ public class LearnerHandler extends ZooKeeperThread { public boolean synced() { return isAlive() - && leader.self.tick <= tickOfNextAckDeadline; + && leader.self.tick.get() <= tickOfNextAckDeadline; } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 2b131c4..cc45562 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -363,10 +363,7 @@ public class QuorumCnxManager { vsw.finish(); senderWorkerMap.put(sid, sw); - if (!queueSendMap.containsKey(sid)) { - queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( - SEND_CAPACITY)); - } + queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); @@ -504,11 +501,7 @@ public class QuorumCnxManager { vsw.finish(); senderWorkerMap.put(sid, sw); - - if (!queueSendMap.containsKey(sid)) { - queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( - SEND_CAPACITY)); - } + queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); @@ -535,19 +528,12 @@ public class QuorumCnxManager { /* * Start a new connection if doesn't have one already. */ - if (!queueSendMap.containsKey(sid)) { - ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>( - SEND_CAPACITY); - queueSendMap.put(sid, bq); - addToSendQueue(bq, b); - + ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY); + ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq); + if (bqExisting != null) { + addToSendQueue(bqExisting, b); } else { - ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); - if(bq != null){ - addToSendQueue(bq, b); - } else { - LOG.error("No queue for server " + sid); - } + addToSendQueue(bq, b); } connectOne(sid); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 9eeeb5d..05d0a1b 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.SaslException; @@ -347,7 +348,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider /** * The current tick */ - protected volatile int tick; + protected AtomicInteger tick = new AtomicInteger(); /** * Whether or not to listen on all IPs for the two quorum ports @@ -1165,7 +1166,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider * Get the current tick */ public int getTick() { - return tick; + return tick.get(); } /** http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/test/config/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml index b1a4f3d..f19bec2 100644 --- a/src/java/test/config/findbugsExcludeFile.xml +++ b/src/java/test/config/findbugsExcludeFile.xml @@ -111,6 +111,9 @@ <!-- these are old classes just for upgrading and should go away --> <Match> + <Class name="org.apache.zookeeper.server.quorum.AuthFastLeaderElection"/> + </Match> + <Match> <Class name="org.apache.zookeeper.server.upgrade.DataNodeV1"/> </Match> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3a538149/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java index 3d4a02c..ffc7ab1 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/FLEDontCareTest.java @@ -236,7 +236,7 @@ public class FLEDontCareTest { * fle represents the FLE instance of server 3.Here we set * its logical clock to 1. */ - fle.logicalclock = 0x1; + fle.logicalclock.set(0x1); /*
