This is an automated email from the ASF dual-hosted git repository.
breed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b4f9aab ZOOKEEPER-3922: The introduction of the oracle, a failure
detector.
b4f9aab is described below
commit b4f9aab099880ba8ef08eaff697debe6cdeae057
Author: Ching-Chan Lee <[email protected]>
AuthorDate: Mon Mar 1 20:51:05 2021 -0800
ZOOKEEPER-3922: The introduction of the oracle, a failure detector.
The introduction of the oracle makes ZooKeeper fault-tolerant in two-node
systems.
The major changes are:
1. QuorumPeerConfig.java
- The changes allow users to enable the oracle and provide essential
information.
- Create QuorumOracleMaj if configured.
2. FastLeaderElection.java
- A re-check mechanism checks the current received votes once the timeout
expires.
- Add another case when receiving a LEADING notification for a node to
locate the existed leader in two-node systems.
3. Leader.java
- Add a re-validation of outstanding proposals mechanism after the only
follower goes away
- Add another handling case when the quorum is not maintainable. It queries
the Oracle for maintaining the quorum along.
4. QuorumVerifier.java
- Add methods for QuorumOracleMaj.java
5. QuorumOracleMaj.java (This is a new file.)
- A sub-class of QuorumMaj
- It default reads a file that contains a binary value to behave as an
Oracle.
Author: Ching-Chan Lee <[email protected]>
Reviewers: Benjamin Reed <[email protected]>, Michael Han <[email protected]>
Closes #1444 from chingchan1996/ZOOKEEPER-3922
---
.../resources/markdown/zookeeperOracleQuorums.md | 202 ++++++++++++
zookeeper-server/pom.xml | 6 +
.../org/apache/zookeeper/cli/ReconfigCommand.java | 2 +-
.../zookeeper/server/PrepRequestProcessor.java | 13 +-
.../server/quorum/FastLeaderElection.java | 162 ++++++++--
.../org/apache/zookeeper/server/quorum/Leader.java | 30 +-
.../apache/zookeeper/server/quorum/QuorumPeer.java | 19 +-
.../zookeeper/server/quorum/QuorumPeerConfig.java | 20 +-
.../server/quorum/flexible/QuorumHierarchical.java | 1 +
.../server/quorum/flexible/QuorumMaj.java | 6 +-
.../server/quorum/flexible/QuorumOracleMaj.java | 204 ++++++++++++
.../server/quorum/flexible/QuorumVerifier.java | 36 +++
.../server/quorum/EagerACLFilterTest.java | 2 +-
.../server/quorum/QuorumPeerConfigTest.java | 1 +
.../server/quorum/QuorumRequestPipelineTest.java | 2 +-
.../org/apache/zookeeper/test/AsyncHammerTest.java | 2 +-
.../org/apache/zookeeper/test/ObserverLETest.java | 2 +-
.../zookeeper/test/ObserverQuorumHammerTest.java | 2 +-
.../java/org/apache/zookeeper/test/QuorumBase.java | 124 ++++++--
.../zookeeper/test/QuorumBaseOracle_2Nodes.java | 350 +++++++++++++++++++++
.../apache/zookeeper/test/QuorumMajorityTest.java | 4 +-
...mMajorityTest.java => QuorumOracleMajTest.java} | 80 +++--
.../zookeeper/test/ThrottledOpObserverTest.java | 2 +-
.../data/invalidsnap/version-2/snapshot.83f | Bin 4824 -> 189686 bytes
24 files changed, 1156 insertions(+), 116 deletions(-)
diff --git
a/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md
new file mode 100644
index 0000000..adc5477
--- /dev/null
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperOracleQuorums.md
@@ -0,0 +1,202 @@
+<!--
+Copyright 2002-2004 The Apache Software Foundation
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+//-->
+
+# Introduction to Oracle Quorum
+The introduction to Oracle Quorum increases the availability of a cluster of 2
ZooKeeper instances with a failure detector as known as the Oracle.
+ The Oracle is designed to grant the permission to the instance which is the
only remaining instance
+in a 2-instance configuration when the other instance is identified as faulty
by the fail detector, the Oracle.
+
+## The implementation of the Oracle
+Every instance shall access to a file which contains either 0 or 1 to indicate
whether this instance is authorized by the Oracle.
+However, this design can be changed since the fail detector algorithms vary
from each other. Therefore, ones can override the method of _askOracle()_ in
_QuorumOracleMaj_ to adapt the preferred way of deciphering the message from
the Oracle.
+
+## The deployment cotexts
+The Oracle is designed to increase the availability of a cluster of 2
ZooKeeper instances; thus, the size of the voting member is **2**.
+In other words, the Oracle solves the consensus problem of a possibility of
faulty instance in a two-instance ensemble.
+
+In the case that the size of the voting members exceeds 2, the expected way to
make the Oracle work correctly is to reconfigure the size of the cluster when a
faulty machine is identified.
+For example, with a configuration of 5 instances, when a faulty machine breaks
the connection with the Leader, it is expected to have a _reconfig_ client
request to the cluster, which makes the cluster to re-form as the configuration
of 4 instances.
+Therefore, once the size of the voting member equals to 2, the configuration
falls into the problem domain which the Oracle is designed to address.
+
+## How to deploy the Oracle in _zoo.cfg_
+Regardless of the size of the cluster, the _oraclePath_ must be configured at
the time of the initialization, which is like other static parameters.
+The below shows the correct way to specify and enable the Oracle.
+
+ oraclePath=/to/some/file
+
+#### An example of zoo.cfg:
+
+ dataDir=/data
+ dataLogDir=/datalog
+ tickTime=2000
+ initLimit=5
+ syncLimit=2
+ autopurge.snapRetainCount=3
+ autopurge.purgeInterval=0
+ maxClientCnxns=60
+ standaloneEnabled=true
+ admin.enableServer=true
+ oraclePath=/chassis/mastership
+ server.1=0.0.0.0:2888:3888;2181
+ server.2=hw1:2888:3888;2181
+
+The QuorumOracleMaj is designed to read the result of a failure detector,
which is written on a text file, the oracle file.
+The configuration in the zoo.cfg like the following:
+
+ oraclePath=/to/some/file
+
+Suppose you have the result of the failure detector written on
/some/path/result.txt, and then the correct configuration is the following:
+
+ oraclePath=/some/path/result.txt
+
+So, what is the correct content of the provided file? An example file can be
created with the following command from the terminal:
+
+ $echo 1 > /some/path/result.txt
+
+Any equivalent files are suitable for the current implementation of
QuorumOracleMaj.
+The number of oracle files should be equal to the number of ZooKeeper
instances configured to enable the Oracle.
+In other words, each ZooKeeper instance should have its oracle file, and the
files shall not be shared; otherwise, the issues in the next section will arise.
+
+## What differs after the deployment of the Oracle enabled
+The _QuorumPeerConfig_ will create an instance of _QuorumOracleMaj_ instead of
the default QuorumVerifier, _QuorumMaj_ when it reads the _zoo.cfg_ contains
_oraclePath_.
+QuorumOracleMaj inheritances from QuorumMaj, and differs from its superclass
by overriding the method of _containsQuorum()_.
+QuorumOracleMaj is designed to execute its version of _containsQuorum_ when
the Leader loses all of its followers, and fails to maintain the quorum.
+In other cases, _QuorumOracleMaj_ shall execute as _QuorumMaj_.
+
+## What we should pay attention to the Oracle
+We consider an asynchronous distributed system which consists of **2**
ZooKeeper instances and an Oracle.
+
+### Liveness Issue:
+When we consider the oracle satisfies the following property introduced by
[CT]:
+
+ Strong Completeness: There is a time after which every process that
crashes is permanently suspected by every correct processes
+
+The liveness of the system is ensured by the Oracle.
+However, when the introduced oracle fails to maintain this property, the lost
of the liveness is expected as the following example,
+
+Suppose we have a Leader and a Follower, which are running in the broadcasting
state,
+The system will lose its liveness when:
+
+ 1. The Leader fails, but the Oracle does not detect the faulty Leader,
which means the Oracle will not authorize the Follower to become a new Leader.
+ 2. When a Follower fails, but the Oracle does not detect the faulty
follower, which means the Oracle will authorize the Leader to move system
forward.
+
+### Safety Issue:
+#### Lost of Progress
+The progress can lost when multiple failures occurs in the system at different
time as the following example,
+
+Suppose we have a Leader(Ben) and a Follower(John) in the broadcasting state,
+
+ At T1 with zxid(0x1_1): L-Ben fails, and the F-John takes over the system
under the authorization from the Oracle.
+ At T2 with zxid(0x2_1): The F-John becomes a new Leader, L-John, and
starts a new epoch.
+ At T3 with zxid(0x2_A): L-John fails
+ At T4 with zxid(0x2_A): Ben recovers up and starts its leader election.
+ At T5 with zxid(0x3_1): Ben becomes the new leader, L-Ben, under the
authorization from the Oracle.
+
+In this case, the system loses its progress after the L-Ben failed.
+
+
+However, the lost of progress can be prevented by making the Oracle is capable
of referring the latest zxid.
+When the Oracle could refer to the latest zxid,
+
+ At T5 with zxid(0x2_A): Ben will not end his leader election because the
Oracle would not authorize although John is down.
+
+Nevertheless, we exchange the liveness for the safety.
+#### Split Brain Issue
+We consider the Oracle satisfies the following desired property introduced by
[CT],
+
+ Accuracy: There is a time after which some correct processes is never
suspected by any processes
+
+Nevertheless, the decisions which the Oracle gives out should be mutual
exclusive.
+
+In other words,
+
+Suppose we have a Leader(Ben) and a Follower(John) in the broadcasting state,
+
+ - At any time, the Oracle will not authorize both Ben and John even though
the failure detectors think each other is faulty.
+ Or
+ - At any time, for any two values in any two Oracle files respectively,
the values are not both equal to 1.
+
+The split brain is expected when the Oracle fails to maintain this property
during the leader election phase of
+
+ 1. Start of the system
+ 2. A failed instance recovers from failures.
+
+## Examples of Concepts for Implementation of a Failure Detector
+One should consider that the failure detector's outcome is to authorize the
querying ZooKeeper instance whether it has the right to move the system forward
without waiting for the faulty instance, which is identified by the failure
detector.
+
+### An Implementation of Hardware
+Suppose two dedicated pieces of hardware, hw1 and hw2, can host ZooKeeper
instances, zk1 and zk2, respectively, and form a cluster.
+A hardware device is attached to both of the hardware, and it is capable of
determining whether the hardware is power on or not.
+So, when hw1 is not power on, the zk1 is undoubtedly faulty.
+Therefore, the hardware device updates the oracle file on hw2 to 1, which
indicates that zk1 is faulty and authorizes zk2 to move the system forwards.
+
+### An Implementation of Software
+Suppose two dedicated pieces of hardware, hw1 and hw2, can host ZooKeeper
instances, zk1 and zk2, respectively, and form a cluster.
+One can have two more services, o1 and o2, on hw1 and hw2, respectively. The
job of o1 and o2 are detecting the other hardware is alive or not.
+For example, o1 can constantly ping hw2 to determine if hw2 is power on or not.
+When o1 cannot ping hw2, o1 identifies that hw2 is faulty and then update the
oracle file of zk1 to 1, which indicates that zk2 is faulty and authorizes zk1
to move the system forwards.
+
+### Use USB devices as Oracle to Maintain Progress
+In macOS,10.15.7 (19H2), the external storage devices are mounted under
`/Volumes`.
+Thus, we can insert a USB device which contains the required information as
the oracle.
+When the device is connected, the oracle authorizes the leader to move system
forward, which also means the other instance fails.
+There are **SIX** steps to reproduce this stimulation.
+
+* Firstly, insert a USB device named `Oracle`, and then we can expect that
`/Volumes/Oracle` is accessible.
+* Secondly, we create a file contains `1` under `/Volumes/Oracle` named
`mastership`.
+Now we can access `/Volumes/Oracle/mastership`, and so does the zookeeper
instances to see whether it has the right to move the system forward.
+The file can easily be generated by the following command:
+
+
+ $echo 1 > mastership
+
+* Thirdly, you shall have a `zoo.cfg` like the example below:
+
+
+ dataDir=/data
+ dataLogDir=/datalog
+ tickTime=2000
+ initLimit=5
+ syncLimit=2
+ autopurge.snapRetainCount=3
+ autopurge.purgeInterval=0
+ maxClientCnxns=60
+ standaloneEnabled=true
+ admin.enableServer=true
+ oraclePath=/Volumes/Oracle/mastership
+ server.1=0.0.0.0:2888:3888;2181
+ server.2=hw1:2888:3888;2181
+
+_(NOTE) The split brain issues will not occur because there is only a SINGLE
USB device in this stimulation._
+_Additionally, `mastership` should not be shared by multiple instances._
+_Thus, only one ZooKeeper instance is configured with Oracle._
+_For more, please refer to Section Safety Issue._
+
+* Fourthly, start the cluster, and it is expected it forms a quorum normally.
+* Fifthly, terminate the instance either without attaching to a USB device or
`mastership` contains 0.
+There are two scenarios to expect:
+ 1. A leader failure occurs, and the remained instance finishes the leader
election on its own due to the oracle.
+ 2. The quorum is still maintained due to the oracle.
+
+* Lastly, when the USB device is removed, `/Volumes/Oracle/mastership` becomes
unavailable.
+Therefore, according to the current implementation, whenever the Leader
queries the oracle, the oracle throws an exception and return `FALSE`.
+Repeat the fifth step, and then it is expected that either the system cannot
recover from a leader failure ,or the leader loses the quorum.
+In either case, the service is interrupted.
+
+With these steps, we can show and practice how the oracle works with
two-instance systems with ease.
+
+##REFERENCE
+[CT] Tushar Deepak Chandra and Sam Toueg. 1991. Unreliable failure detectors
for asynchronous systems (preliminary version). In <i>Proceedings of the tenth
annual ACM symposium on Principles of distributed computing</i> (<i>PODC
'91</i>). Association for Computing Machinery, New York, NY, USA, 325–340.
DOI:https://doi.org/10.1145/112600.112627
\ No newline at end of file
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 90ee827..2636f5c 100755
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -172,6 +172,12 @@
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.6</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
index 8afc14b..ce490ad 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ReconfigCommand.java
@@ -130,7 +130,7 @@ public class ReconfigCommand extends CliCommand {
//check that membership makes sense; leader will make these
checks again
//don't check for leader election ports since
//client doesn't know what leader election alg is used
- members = QuorumPeerConfig.parseDynamicConfig(dynamicCfg, 0,
true, false).toString();
+ members = QuorumPeerConfig.parseDynamicConfig(dynamicCfg, 0,
true, false, null).toString();
} catch (Exception e) {
throw new CliParseException("Error processing " +
cl.getOptionValue("file") + e.getMessage());
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index e71828d..11b5ccb 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -65,6 +65,7 @@ import
org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CloseSessionTxn;
@@ -452,7 +453,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements Req
try {
Properties props = new Properties();
props.load(new StringReader(newMembers));
- request.qv = QuorumPeerConfig.parseDynamicConfig(props,
lzks.self.getElectionType(), true, false);
+ request.qv = QuorumPeerConfig.parseDynamicConfig(props,
lzks.self.getElectionType(), true, false, lastSeenQV.getOraclePath());
request.qv.setVersion(request.getHdr().getZxid());
} catch (IOException | ConfigException e) {
throw new
KeeperException.BadArgumentsException(e.getMessage());
@@ -472,7 +473,7 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements Req
leavingServers = StringUtils.split(leavingServersString,
",");
}
- if (!(lastSeenQV instanceof QuorumMaj)) {
+ if (!(lastSeenQV instanceof QuorumMaj) && !(lastSeenQV
instanceof QuorumOracleMaj)) {
String msg = "Incremental reconfiguration requested but
last configuration seen has a non-majority quorum system";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
@@ -514,7 +515,13 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements Req
} catch (ConfigException e) {
throw new
KeeperException.BadArgumentsException("Reconfiguration failed");
}
- request.qv = new QuorumMaj(nextServers);
+
+ if (lastSeenQV instanceof QuorumMaj) {
+ request.qv = new QuorumMaj(nextServers);
+ } else {
+ request.qv = new QuorumOracleMaj(nextServers,
lastSeenQV.getOraclePath());
+ }
+
request.qv.setVersion(request.getHdr().getZxid());
}
if (QuorumPeerConfig.isStandaloneEnabled() &&
request.qv.getVotingMembers().size() < 2) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index 0950c6d..9fc9d14 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -34,6 +34,7 @@ import
org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
@@ -948,7 +949,7 @@ public class FastLeaderElection implements Election {
Long.toHexString(proposedZxid));
sendNotifications();
- SyncedLearnerTracker voteSet;
+ SyncedLearnerTracker voteSet = null;
/*
* Loop in which we exchange notifications until we find a leader
@@ -977,7 +978,24 @@ public class FastLeaderElection implements Election {
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
+
+ /*
+ * When a leader failure happens on a master, the backup
will be supposed to receive the honour from
+ * Oracle and become a leader, but the honour is likely to
be delay. We do a re-check once timeout happens
+ *
+ * The leader election algorithm does not provide the
ability of electing a leader from a single instance
+ * which is in a configuration of 2 instances.
+ * */
+ self.getQuorumVerifier().revalidateVoteset(voteSet,
notTimeout != minNotificationInterval);
+ if (self.getQuorumVerifier() instanceof QuorumOracleMaj &&
voteSet != null && voteSet.hasAllQuorums() && notTimeout !=
minNotificationInterval) {
+ setPeerState(proposedLeader, voteSet);
+ Vote endVote = new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ }
+
LOG.info("Notification time out: {} ms", notTimeout);
+
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the
current or next
@@ -1051,43 +1069,53 @@ public class FastLeaderElection implements Election {
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
+
+ /*
+ * In ZOOKEEPER-3922, we separate the behaviors of
FOLLOWING and LEADING.
+ * To avoid the duplication of codes, we create a
method called followingBehavior which was used to
+ * shared by FOLLOWING and LEADING. This method returns
a Vote. When the returned Vote is null, it follows
+ * the original idea to break swtich statement;
otherwise, a valid returned Vote indicates, a leader
+ * is generated.
+ *
+ * The reason why we need to separate these behaviors
is to make the algorithm runnable for 2-node
+ * setting. An extra condition for generating leader is
needed. Due to the majority rule, only when
+ * there is a majority in the voteset, a leader will be
generated. However, in a configuration of 2 nodes,
+ * the number to achieve the majority remains 2, which
means a recovered node cannot generate a leader which is
+ * the existed leader. Therefore, we need the Oracle to
kick in this situation. In a two-node configuration, the Oracle
+ * only grants the permission to maintain the progress
to one node. The oracle either grants the permission to the
+ * remained node and makes it a new leader when there
is a faulty machine, which is the case to maintain the progress.
+ * Otherwise, the oracle does not grant the permission
to the remained node, which further causes a service down.
+ *
+ * In the former case, when a failed server recovers
and participate in the leader election, it would not locate a
+ * new leader because there does not exist a majority
in the voteset. It fails on the containAllQuorum() infinitely due to
+ * two facts. First one is the fact that it does do not
have a majority in the voteset. The other fact is the fact that
+ * the oracle would not give the permission since the
oracle already gave the permission to the existed leader, the healthy machine.
+ * Logically, when the oracle replies with negative, it
implies the existed leader which is LEADING notification comes from is a valid
leader.
+ * To threat this negative replies as a permission to
generate the leader is the purpose to separate these two behaviors.
+ *
+ *
+ * */
case FOLLOWING:
- case LEADING:
/*
- * Consider all notifications from the same epoch
- * together.
- */
- if (n.electionEpoch == logicalclock.get()) {
- recvset.put(n.sid, new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch, n.state));
- voteSet = getVoteTracker(recvset, new
Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
- if (voteSet.hasAllQuorums() &&
checkLeader(recvset, n.leader, n.electionEpoch)) {
- setPeerState(n.leader, voteSet);
- Vote endVote = new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch);
- leaveInstance(endVote);
- return endVote;
- }
+ * To avoid duplicate codes
+ * */
+ Vote resultFN = receivedFollowingNotification(recvset,
outofelection, voteSet, n);
+ if (resultFN == null) {
+ break;
+ } else {
+ return resultFN;
}
-
+ case LEADING:
/*
- * Before joining an established ensemble, verify that
- * a majority are following the same leader.
- *
- * Note that the outofelection map also stores votes
from the current leader election.
- * See ZOOKEEPER-1732 for more information.
- */
- outofelection.put(n.sid, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
- voteSet = getVoteTracker(outofelection, new
Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
-
- if (voteSet.hasAllQuorums() &&
checkLeader(outofelection, n.leader, n.electionEpoch)) {
- synchronized (this) {
- logicalclock.set(n.electionEpoch);
- setPeerState(n.leader, voteSet);
- }
- Vote endVote = new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch);
- leaveInstance(endVote);
- return endVote;
+ * In leadingBehavior(), it performs followingBehvior()
first. When followingBehavior() returns
+ * a null pointer, ask Oracle whether to follow this
leader.
+ * */
+ Vote resultLN = receivedLeadingNotification(recvset,
outofelection, voteSet, n);
+ if (resultLN == null) {
+ break;
+ } else {
+ return resultLN;
}
- break;
default:
LOG.warn("Notification state unrecognized: {}
(n.state), {}(n.sid)", n.state, n.sid);
break;
@@ -1115,6 +1143,74 @@ public class FastLeaderElection implements Election {
}
}
+ private Vote receivedFollowingNotification(Map<Long, Vote> recvset,
Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
+ /*
+ * Consider all notifications from the same epoch
+ * together.
+ */
+ if (n.electionEpoch == logicalclock.get()) {
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch,
n.peerEpoch, n.state));
+ voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
+ if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader,
n.electionEpoch)) {
+ setPeerState(n.leader, voteSet);
+ Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch,
n.peerEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ }
+ }
+
+ /*
+ * Before joining an established ensemble, verify that
+ * a majority are following the same leader.
+ *
+ * Note that the outofelection map also stores votes from the current
leader election.
+ * See ZOOKEEPER-1732 for more information.
+ */
+ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid,
n.electionEpoch, n.peerEpoch, n.state));
+ voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
+
+ if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader,
n.electionEpoch)) {
+ synchronized (this) {
+ logicalclock.set(n.electionEpoch);
+ setPeerState(n.leader, voteSet);
+ }
+ Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch,
n.peerEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ }
+
+ return null;
+ }
+
+ private Vote receivedLeadingNotification(Map<Long, Vote> recvset,
Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
+ /*
+ *
+ * In a two-node configuration, a recovery nodes cannot locate a leader
because of the lack of the majority in the voteset.
+ * Therefore, it is the time for Oracle to take place as a tight
breaker.
+ *
+ * */
+ Vote result = receivedFollowingNotification(recvset, outofelection,
voteSet, n);
+ if (result == null) {
+ /*
+ * Ask Oracle to see if it is okay to follow this leader.
+ *
+ * We don't need the CheckLeader() because itself cannot be a
leader candidate
+ * */
+ if (self.getQuorumVerifier().getNeedOracle() &&
!self.getQuorumVerifier().askOracle()) {
+ LOG.info("Oracle indicates to follow");
+ setPeerState(n.leader, voteSet);
+ Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch,
n.peerEpoch);
+ leaveInstance(endVote);
+ return endVote;
+ } else {
+ LOG.info("Oracle indicates not to follow");
+ return null;
+ }
+ } else {
+ return result;
+ }
+ }
+
/**
* Check if a given sid is represented in either the current or
* the next voting view
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 2de2cee..ce8f799 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -174,6 +174,10 @@ public class Leader extends LearnerMaster {
void addForwardingFollower(LearnerHandler lh) {
synchronized (forwardingFollowers) {
forwardingFollowers.add(lh);
+ /*
+ * Any changes on forwardiongFollowers could possible affect the
need of Oracle.
+ * */
+ self.getQuorumVerifier().updateNeedOracle(new
ArrayList<>(forwardingFollowers));
}
}
@@ -757,7 +761,27 @@ public class Leader extends LearnerMaster {
break;
}
- if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
+ /*
+ *
+ * We will need to re-validate the outstandingProposal to
maintain the progress of ZooKeeper.
+ * It is likely a proposal is waiting for enough ACKs to
be committed. The proposals are sent out, but the
+ * only follower goes away which makes the proposals will
not be committed until the follower recovers back.
+ * An earlier proposal which is not committed will block
any further proposals. So, We need to re-validate those
+ * outstanding proposal with the help from Oracle. A key
point in the process of re-validation is that the proposals
+ * need to be processed in order.
+ *
+ * We make the whole method blocking to avoid any possible
race condition on outstandingProposal and lastCommitted
+ * as well as to avoid nested synchronization.
+ *
+ * As a more generic approach, we pass the object of
forwardingFollowers to QuorumOracleMaj to determine if we need
+ * the help from Oracle.
+ *
+ *
+ * the size of outstandingProposals can be 1. The only one
outstanding proposal is the one waiting for the ACK from
+ * the leader itself.
+ * */
+ if (!tickSkip && !syncedAckSet.hasAllQuorums()
+ &&
!(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) &&
self.getQuorumVerifier().revalidateOutstandingProp(this, new
ArrayList<>(outstandingProposals.values()), lastCommitted))) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced,
only synced with sids: [ "
@@ -909,10 +933,10 @@ public class Leader extends LearnerMaster {
// commit proposals in order
if (zxid != lastCommitted + 1) {
LOG.warn(
- "Commiting zxid 0x{} from {} noy first!",
+ "Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid),
followerAddr);
- LOG.warn("First is {}", (lastCommitted + 1));
+ LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 3102c63..19aae08 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -78,6 +78,7 @@ import
org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
@@ -1254,6 +1255,22 @@ public class QuorumPeer extends ZooKeeperThread
implements QuorumStats.Provider
new QuorumMaj(quorumPeers));
}
+ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File
logDir, int clientPort, int electionAlg, long myid, int tickTime, int
initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath)
throws IOException {
+ this(
+ quorumPeers,
+ snapDir,
+ logDir,
+ electionAlg,
+ myid,
+ tickTime,
+ initLimit,
+ syncLimit,
+ connectToLearnerMasterLimit,
+ false,
+ ServerCnxnFactory.createFactory(getClientAddress(quorumPeers,
myid, clientPort), -1),
+ new QuorumOracleMaj(quorumPeers, oraclePath));
+ }
+
/**
* This constructor is only used by the existing unit test code.
* It defaults to FileLogProvider persistence provider.
@@ -1808,7 +1825,7 @@ public class QuorumPeer extends ZooKeeperThread
implements QuorumStats.Provider
public QuorumVerifier configFromString(String s) throws IOException,
ConfigException {
Properties props = new Properties();
props.load(new StringReader(s));
- return QuorumPeerConfig.parseDynamicConfig(props, electionType, false,
false);
+ return QuorumPeerConfig.parseDynamicConfig(props, electionType, false,
false, getQuorumVerifier().getOraclePath());
}
/**
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index c56204f..1b37f29 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -52,6 +52,7 @@ import
org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.VerifyingFileFactory;
@@ -130,6 +131,8 @@ public class QuorumPeerConfig {
Integer.parseInt(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
String.valueOf(MultipleAddresses.DEFAULT_TIMEOUT.toMillis())));
+ protected String oraclePath;
+
/**
* Minimum snapshot retain count.
* @see org.apache.zookeeper.server.PurgeTxnLog#purge(File, File, int)
@@ -380,6 +383,8 @@ public class QuorumPeerConfig {
multiAddressReachabilityCheckTimeoutMs =
Integer.parseInt(value);
} else if (key.equals("multiAddress.reachabilityCheckEnabled")) {
multiAddressReachabilityCheckEnabled = parseBoolean(key,
value);
+ } else if (key.equals("oraclePath")) {
+ oraclePath = value;
} else {
System.setProperty("zookeeper." + key, value);
}
@@ -629,6 +634,15 @@ public class QuorumPeerConfig {
}
}
+
+ private static QuorumVerifier createQuorumVerifier(Properties
dynamicConfigProp, boolean isHierarchical, String oraclePath) throws
ConfigException {
+ if (oraclePath == null) {
+ return createQuorumVerifier(dynamicConfigProp, isHierarchical);
+ } else {
+ return new QuorumOracleMaj(dynamicConfigProp, oraclePath);
+ }
+ }
+
private static QuorumVerifier createQuorumVerifier(Properties
dynamicConfigProp, boolean isHierarchical) throws ConfigException {
if (isHierarchical) {
return new QuorumHierarchical(dynamicConfigProp);
@@ -642,7 +656,7 @@ public class QuorumPeerConfig {
}
void setupQuorumPeerConfig(Properties prop, boolean
configBackwardCompatibilityMode) throws IOException, ConfigException {
- quorumVerifier = parseDynamicConfig(prop, electionAlg, true,
configBackwardCompatibilityMode);
+ quorumVerifier = parseDynamicConfig(prop, electionAlg, true,
configBackwardCompatibilityMode, oraclePath);
setupMyId();
setupClientPort();
setupPeerType();
@@ -656,7 +670,7 @@ public class QuorumPeerConfig {
* @throws IOException
* @throws ConfigException
*/
- public static QuorumVerifier parseDynamicConfig(Properties
dynamicConfigProp, int eAlg, boolean warnings, boolean
configBackwardCompatibilityMode) throws IOException, ConfigException {
+ public static QuorumVerifier parseDynamicConfig(Properties
dynamicConfigProp, int eAlg, boolean warnings, boolean
configBackwardCompatibilityMode, String oraclePath) throws IOException,
ConfigException {
boolean isHierarchical = false;
for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
String key = entry.getKey().toString().trim();
@@ -668,7 +682,7 @@ public class QuorumPeerConfig {
}
}
- QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp,
isHierarchical);
+ QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp,
isHierarchical, oraclePath);
int numParticipators = qv.getVotingMembers().size();
int numObservers = qv.getObservingMembers().size();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
index ced966f..5376ae6 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
@@ -120,6 +120,7 @@ public class QuorumHierarchical implements QuorumVerifier {
}
return true;
}
+
/**
* This constructor requires the quorum configuration
* to be declared in a separate file, and it takes the
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
index 6e6f1c2..ed38533 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
@@ -26,6 +26,8 @@ import java.util.Set;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class implements a validator for majority quorums. The implementation
is
@@ -34,11 +36,13 @@ import
org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
*/
public class QuorumMaj implements QuorumVerifier {
+ private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
+
private Map<Long, QuorumServer> allMembers = new HashMap<Long,
QuorumServer>();
private Map<Long, QuorumServer> votingMembers = new HashMap<Long,
QuorumServer>();
private Map<Long, QuorumServer> observingMembers = new HashMap<Long,
QuorumServer>();
private long version = 0;
- private int half;
+ protected int half;
public int hashCode() {
assert false : "hashCode not designed";
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
new file mode 100644
index 0000000..0845b80
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum.flexible;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.SyncedLearnerTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ *
+ * QuorumOracleMaj is a subclass of QuorumMaj.
+ *
+ * QuorumOracleMaj is designed to be functional in a 2-nodes configuration.
The only method that this class overrides super
+ * class' method is containsQuorum(). Besides the check of oracle, it also
checks the number of voting member. Whenever the
+ * number of voting members is greater than 2. QuorumOracleMaj shall function
as hook to its super class.
+ * */
+public class QuorumOracleMaj extends QuorumMaj {
+ private static final Logger LOG =
LoggerFactory.getLogger(QuorumOracleMaj.class);
+
+ private String oracle = null;
+
+ private final AtomicBoolean needOracle = new AtomicBoolean(true);
+
+ public QuorumOracleMaj(Map<Long, QuorumPeer.QuorumServer> allMembers,
String oraclePath) {
+ super(allMembers);
+ setOracle(oraclePath);
+ }
+
+ public QuorumOracleMaj(Properties props, String oraclePath) throws
QuorumPeerConfig.ConfigException {
+ super(props);
+ setOracle(oraclePath);
+ }
+
+ private void setOracle(String path) {
+ if (oracle == null) {
+ oracle = path;
+ LOG.info("Oracle is set to {}", path);
+ } else {
+ LOG.warn("Oracle is already set. Ignore:{}", path);
+ }
+ }
+
+ @Override
+ public boolean updateNeedOracle(List<LearnerHandler> forwardingFollowers) {
+ // Do we have the quorum
+ needOracle.set(forwardingFollowers.isEmpty() &&
super.getVotingMembers().size() == 2);
+ return needOracle.get();
+ }
+
+ @Override
+ public boolean askOracle() {
+ FileReader fr = null;
+ try {
+ int read;
+ fr = new FileReader(FilenameUtils.getFullPath(oracle) +
FilenameUtils.getName(oracle));
+ read = fr.read();
+ LOG.debug("Oracle says:{}", (char) read);
+ fr.close();
+ return (char) read == '1';
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (oracle == null) {
+ LOG.error("Oracle is not set, return false");
+ }
+ return false;
+ } finally {
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean getNeedOracle() {
+ return needOracle.get();
+ }
+
+ @Override
+ public String getOraclePath() {
+ return oracle;
+ }
+
+ @Override
+ public boolean overrideQuorumDecision(List<LearnerHandler>
forwardingFollowers) {
+ if (updateNeedOracle(forwardingFollowers) && askOracle()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean revalidateOutstandingProp(Leader self,
ArrayList<Leader.Proposal> outstandingProposal, long lastCommitted) {
+ LOG.debug("Start Revalidation outstandingProposals");
+ try {
+ while (outstandingProposal.size() >= 1) {
+ outstandingProposal.sort((o1, o2) -> (int)
(o1.packet.getZxid() - o2.packet.getZxid()));
+
+ Leader.Proposal p;
+ int i = 0;
+ while (i < outstandingProposal.size()) {
+ p = outstandingProposal.get(i);
+ if (p.request.zxid > lastCommitted) {
+ LOG.debug("Re-validate outstanding proposal: 0x{}
size:{} lastCommitted:{}", Long.toHexString(p.request.zxid),
outstandingProposal.size(), Long.toHexString(lastCommitted));
+ if (!self.tryToCommit(p, p.request.zxid, null)) {
+ break;
+ } else {
+ lastCommitted = p.request.zxid;
+ outstandingProposal.remove(p);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+
+ LOG.debug("Finish Revalidation outstandingProposals");
+ return true;
+ }
+
+ @Override
+ public boolean revalidateVoteset(SyncedLearnerTracker voteSet, boolean
timeout) {
+ return voteSet != null && voteSet.hasAllQuorums() && timeout;
+ }
+
+ @Override
+ public boolean containsQuorum(Set<Long> ackSet) {
+ if (oracle == null || getVotingMembers().size() > 2) {
+ return super.containsQuorum(ackSet);
+ } else if (!super.containsQuorum(ackSet)) {
+ if (getNeedOracle()) {
+ LOG.debug("We lose the quorum, but we do not have any valid
followers Oracle:{}", askOracle());
+ return askOracle();
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ QuorumOracleMaj qm = (QuorumOracleMaj) o;
+ if (qm.getVersion() == super.getVersion()) {
+ return true;
+ }
+ if (super.getAllMembers().size() != qm.getAllMembers().size()) {
+ return false;
+ }
+ for (QuorumPeer.QuorumServer qs : super.getAllMembers().values()) {
+ QuorumPeer.QuorumServer qso = qm.getAllMembers().get(qs.id);
+ if (qso == null || !qs.equals(qso)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 43; // any arbitrary constant will do
+ }
+}
+
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
index 12d8489..7362313 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
@@ -18,9 +18,14 @@
package org.apache.zookeeper.server.quorum.flexible;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.SyncedLearnerTracker;
/**
* All quorum validators have to implement a method called
@@ -39,6 +44,37 @@ public interface QuorumVerifier {
Map<Long, QuorumServer> getVotingMembers();
Map<Long, QuorumServer> getObservingMembers();
boolean equals(Object o);
+ /*
+ * Only QuorumOracleMaj will implement these methods. Other class will
raise warning if the methods are called and
+ * return false always.
+ * */
+ default boolean updateNeedOracle(List<LearnerHandler> forwardingFollowers)
{
+ return false;
+ }
+ default boolean getNeedOracle() {
+ return false;
+ }
+
+ default boolean askOracle() {
+ return false;
+ }
+
+ default boolean overrideQuorumDecision(List<LearnerHandler>
forwardingFollowers) {
+ return false;
+ }
+
+ default boolean revalidateOutstandingProp(Leader self,
ArrayList<Leader.Proposal> outstandingProposal, long lastCommitted) {
+ return false;
+ }
+
+ default boolean revalidateVoteset(SyncedLearnerTracker voteSet, boolean
timeout) {
+ return false;
+ }
+
+ default String getOraclePath() {
+ return null;
+ };
+
String toString();
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
index d37516b..a27d5cf 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
@@ -72,7 +72,7 @@ public class EagerACLFilterTest extends QuorumBase {
ensureCheck(checkEnabled);
CountdownWatcher clientWatch = new CountdownWatcher();
CountdownWatcher clientWatchB = new CountdownWatcher();
- super.setUp(true);
+ super.setUp(true, true);
String hostPort = getPeersMatching(serverState).split(",")[0];
int clientPort = Integer.parseInt(hostPort.split(":")[1]);
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java
index b2c350e..407a9d1 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerConfigTest.java
@@ -203,6 +203,7 @@ public class QuorumPeerConfigTest {
private Properties getDefaultZKProperties() {
Properties zkProp = new Properties();
zkProp.setProperty("dataDir", new File("myDataDir").getAbsolutePath());
+ zkProp.setProperty("oraclePath", new
File("mastership").getAbsolutePath());
return zkProp;
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
index 0888d6f..c395a62 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
@@ -69,7 +69,7 @@ public class QuorumRequestPipelineTest extends QuorumBase {
public void setUp(ServerState serverState) throws Exception {
CountdownWatcher clientWatch = new CountdownWatcher();
- super.setUp(true);
+ super.setUp(true, true);
zkClient = createClient(clientWatch, getPeersMatching(serverState));
zkClient.addAuthInfo(AUTH_PROVIDER, AUTH);
clientWatch.waitForConnected(CONNECTION_TIMEOUT);
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java
index 2276354..108b21c 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/AsyncHammerTest.java
@@ -47,7 +47,7 @@ public class AsyncHammerTest extends ZKTestCase implements
StringCallback, VoidC
private volatile boolean bang;
public void setUp(boolean withObservers) throws Exception {
- qb.setUp(withObservers);
+ qb.setUp(withObservers, false);
}
protected void restart() throws Exception {
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java
index 014ccb4..67aa595 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverLETest.java
@@ -35,7 +35,7 @@ public class ObserverLETest extends ZKTestCase {
@BeforeEach
public void establishThreeParticipantOneObserverEnsemble() throws
Exception {
- qb.setUp(true);
+ qb.setUp(true, false);
ct.hostPort = qb.hostPort;
ct.setUpAll();
qb.s5.shutdown();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java
index 00953f7..45e37f2 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverQuorumHammerTest.java
@@ -28,7 +28,7 @@ public class ObserverQuorumHammerTest extends
QuorumHammerTest {
@BeforeEach
@Override
public void setUp() throws Exception {
- qb.setUp(true);
+ qb.setUp(true, false);
cht.hostPort = qb.hostPort;
cht.setUpAll();
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
index 02e5e0a..c2396a6 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -48,6 +49,13 @@ public class QuorumBase extends ClientBase {
private static final String LOCALADDR = "127.0.0.1";
+ private static final String oraclePath_0 =
"./tmp/oraclePath/0/mastership/";
+ private static final String oraclePath_1 =
"./tmp/oraclePath/1/mastership/";
+ private static final String oraclePath_2 =
"./tmp/oraclePath/0/mastership/";
+ private static final String oraclePath_3 =
"./tmp/oraclePath/1/mastership/";
+ private static final String oraclePath_4 =
"./tmp/oraclePath/0/mastership/";
+ private static final String mastership = "value";
+
File s1dir, s2dir, s3dir, s4dir, s5dir;
QuorumPeer s1, s2, s3, s4, s5;
protected int port1;
@@ -71,13 +79,14 @@ public class QuorumBase extends ClientBase {
protected boolean localSessionsEnabled = false;
protected boolean localSessionsUpgradingEnabled = false;
+
@BeforeEach
@Override
public void setUp() throws Exception {
- setUp(false);
+ setUp(false, true);
}
- protected void setUp(boolean withObservers) throws Exception {
+ protected void setUp(boolean withObservers, boolean withOracle) throws
Exception {
LOG.info("QuorumBase.setup {}", getTestName());
setupTestEnv();
@@ -121,21 +130,54 @@ public class QuorumBase extends ClientBase {
s4dir = ClientBase.createTmpDir();
s5dir = ClientBase.createTmpDir();
- startServers(withObservers);
+ startServers(withObservers, withOracle);
OSMXBean osMbean = new OSMXBean();
if (osMbean.getUnix()) {
LOG.info("Initial fdcount is: {}",
osMbean.getOpenFileDescriptorCount());
}
+ if (withOracle) {
+ File directory = new File(oraclePath_0);
+ directory.mkdirs();
+ FileWriter fw = new FileWriter(oraclePath_0 + mastership);
+ fw.write("1");
+ fw.close();
+
+ directory = new File(oraclePath_1);
+ directory.mkdirs();
+ fw = new FileWriter(oraclePath_1 + mastership);
+ fw.write("0");
+ fw.close();
+
+ directory = new File(oraclePath_2);
+ directory.mkdirs();
+ fw = new FileWriter(oraclePath_2 + mastership);
+ fw.write("0");
+ fw.close();
+
+ directory = new File(oraclePath_3);
+ directory.mkdirs();
+ fw = new FileWriter(oraclePath_3 + mastership);
+ fw.write("1");
+ fw.close();
+
+ directory = new File(oraclePath_4);
+ directory.mkdirs();
+ fw = new FileWriter(oraclePath_4 + mastership);
+ fw.write("0");
+ fw.close();
+ }
+
+
LOG.info("Setup finished");
}
void startServers() throws Exception {
- startServers(false);
+ startServers(false, true);
}
- void startServers(boolean withObservers) throws Exception {
+ void startServers(boolean withObservers, boolean withOracle) throws
Exception {
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
@@ -152,21 +194,39 @@ public class QuorumBase extends ClientBase {
peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER;
}
- LOG.info("creating QuorumPeer 1 port {}", portClient1);
- s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit);
- assertEquals(portClient1, s1.getClientPort());
- LOG.info("creating QuorumPeer 2 port {}", portClient2);
- s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit);
- assertEquals(portClient2, s2.getClientPort());
- LOG.info("creating QuorumPeer 3 port {}", portClient3);
- s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit);
- assertEquals(portClient3, s3.getClientPort());
- LOG.info("creating QuorumPeer 4 port {}", portClient4);
- s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit);
- assertEquals(portClient4, s4.getClientPort());
- LOG.info("creating QuorumPeer 5 port {}", portClient5);
- s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit);
- assertEquals(portClient5, s5.getClientPort());
+ if (!withOracle) {
+ LOG.info("creating QuorumPeer 1 port {}", portClient1);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient1, s1.getClientPort());
+ LOG.info("creating QuorumPeer 2 port {}", portClient2);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient2, s2.getClientPort());
+ LOG.info("creating QuorumPeer 3 port {}", portClient3);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient3, s3.getClientPort());
+ LOG.info("creating QuorumPeer 4 port {}", portClient4);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient4, s4.getClientPort());
+ LOG.info("creating QuorumPeer 5 port {}", portClient5);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient5, s5.getClientPort());
+ } else {
+ LOG.info("creating QuorumPeer 1 port {}", portClient1);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_0 +
mastership);
+ assertEquals(portClient1, s1.getClientPort());
+ LOG.info("creating QuorumPeer 2 port {}", portClient2);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_1 +
mastership);
+ assertEquals(portClient2, s2.getClientPort());
+ LOG.info("creating QuorumPeer 3 port {}", portClient3);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_2 +
mastership);
+ assertEquals(portClient3, s3.getClientPort());
+ LOG.info("creating QuorumPeer 4 port {}", portClient4);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_3 +
mastership);
+ assertEquals(portClient4, s4.getClientPort());
+ LOG.info("creating QuorumPeer 5 port {}", portClient5);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_4 +
mastership);
+ assertEquals(portClient5, s5.getClientPort());
+ }
if (withObservers) {
s4.setLearnerType(LearnerType.OBSERVER);
@@ -230,18 +290,18 @@ public class QuorumBase extends ClientBase {
}
public int getLeaderIndex() {
- if (s1.getPeerState() == ServerState.LEADING) {
- return 0;
- } else if (s2.getPeerState() == ServerState.LEADING) {
- return 1;
- } else if (s3.getPeerState() == ServerState.LEADING) {
- return 2;
- } else if (s4.getPeerState() == ServerState.LEADING) {
- return 3;
- } else if (s5.getPeerState() == ServerState.LEADING) {
- return 4;
- }
- return -1;
+ if (s1.getPeerState() == ServerState.LEADING) {
+ return 0;
+ } else if (s2.getPeerState() == ServerState.LEADING) {
+ return 1;
+ } else if (s3.getPeerState() == ServerState.LEADING) {
+ return 2;
+ } else if (s4.getPeerState() == ServerState.LEADING) {
+ return 3;
+ } else if (s5.getPeerState() == ServerState.LEADING) {
+ return 4;
+ }
+ return -1;
}
public int getLeaderClientPort() {
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBaseOracle_2Nodes.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBaseOracle_2Nodes.java
new file mode 100644
index 0000000..482027d
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBaseOracle_2Nodes.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.util.OSMXBean;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class QuorumBaseOracle_2Nodes extends ClientBase{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(QuorumBase.class);
+
+ private static final String LOCALADDR = "127.0.0.1";
+
+ private static final String oraclePath_0 =
"./tmp/oraclePath/0/mastership/";
+ private static final String oraclePath_1 =
"./tmp/oraclePath/1/mastership/";
+
+ private static final String mastership = "value";
+
+ File s1dir, s2dir;
+ QuorumPeer s1, s2;
+ protected int port1;
+ protected int port2;
+
+ protected int portLE1;
+ protected int portLE2;
+
+ protected int portClient1;
+ protected int portClient2;
+
+ protected boolean localSessionsEnabled = false;
+ protected boolean localSessionsUpgradingEnabled = false;
+
+
+
+ @BeforeEach
+ @Override
+ public void setUp() throws Exception {
+ LOG.info("QuorumBase.setup {}", getTestName());
+ setupTestEnv();
+
+ JMXEnv.setUp();
+
+ setUpAll();
+
+ port1 = PortAssignment.unique();
+ port2 = PortAssignment.unique();
+
+ portLE1 = PortAssignment.unique();
+ portLE2 = PortAssignment.unique();
+
+ portClient1 = PortAssignment.unique();
+ portClient2 = PortAssignment.unique();
+
+ hostPort = "127.0.0.1:"
+ + portClient1
+ + ",127.0.0.1:"
+ + portClient2;
+ LOG.info("Ports are: {}", hostPort);
+
+ s1dir = ClientBase.createTmpDir();
+ s2dir = ClientBase.createTmpDir();
+
+ startServers();
+
+ OSMXBean osMbean = new OSMXBean();
+ if (osMbean.getUnix()) {
+ LOG.info("Initial fdcount is: {}",
osMbean.getOpenFileDescriptorCount());
+ }
+
+ File directory = new File(oraclePath_0);
+ directory.mkdirs();
+ FileWriter fw = new FileWriter(oraclePath_0 + mastership);
+ fw.write("0");
+ fw.close();
+
+ directory = new File(oraclePath_1);
+ directory.mkdirs();
+ fw = new FileWriter(oraclePath_1 + mastership);
+ fw.write("1");
+ fw.close();
+
+
+ LOG.info("Setup finished");
+ }
+
+ void startServers() throws Exception {
+ int tickTime = 2000;
+ int initLimit = 3;
+ int syncLimit = 3;
+ int connectToLearnerMasterLimit = 3;
+ Map<Long, QuorumPeer.QuorumServer> peers = new HashMap<Long,
QuorumPeer.QuorumServer>();
+ peers.put(Long.valueOf(1), new QuorumPeer.QuorumServer(1, new
InetSocketAddress(LOCALADDR, port1), new InetSocketAddress(LOCALADDR, portLE1),
new InetSocketAddress(LOCALADDR, portClient1),
QuorumPeer.LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(2), new QuorumPeer.QuorumServer(2, new
InetSocketAddress(LOCALADDR, port2), new InetSocketAddress(LOCALADDR, portLE2),
new InetSocketAddress(LOCALADDR, portClient2),
QuorumPeer.LearnerType.PARTICIPANT));
+
+ LOG.info("creating QuorumPeer 1 port {}", portClient1);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_0 + mastership);
+ assertEquals(portClient1, s1.getClientPort());
+ LOG.info("creating QuorumPeer 2 port {}", portClient2);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime,
initLimit, syncLimit, connectToLearnerMasterLimit, oraclePath_1 + mastership);
+ assertEquals(portClient2, s2.getClientPort());
+
+
+ LOG.info("QuorumPeer 1 voting view: {}", s1.getVotingView());
+ LOG.info("QuorumPeer 2 voting view: {}", s2.getVotingView());
+
+ s1.enableLocalSessions(localSessionsEnabled);
+ s2.enableLocalSessions(localSessionsEnabled);
+ s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+ s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+
+ LOG.info("start QuorumPeer 1");
+ s1.start();
+ LOG.info("start QuorumPeer 2");
+ s2.start();
+
+ LOG.info("Checking ports {}", hostPort);
+ for (String hp : hostPort.split(",")) {
+ assertTrue(ClientBase.waitForServerUp(hp, CONNECTION_TIMEOUT),
"waiting for server up");
+ LOG.info("{} is accepting client connections", hp);
+ }
+
+ // interesting to see what's there...
+ JMXEnv.dump();
+ // make sure we have these 5 servers listed
+ Set<String> ensureNames = new LinkedHashSet<String>();
+ for (int i = 1; i <= 2; i++) {
+ ensureNames.add("InMemoryDataTree");
+ }
+ for (int i = 1; i <= 2; i++) {
+ ensureNames.add("name0=ReplicatedServer_id" + i +
",name1=replica." + i + ",name2=");
+ }
+ for (int i = 1; i <= 2; i++) {
+ for (int j = 1; j <= 2; j++) {
+ ensureNames.add("name0=ReplicatedServer_id" + i +
",name1=replica." + j);
+ }
+ }
+ for (int i = 1; i <= 2; i++) {
+ ensureNames.add("name0=ReplicatedServer_id" + i);
+ }
+ JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
+ }
+
+ public int getLeaderIndex() {
+ if (s1.getPeerState() == QuorumPeer.ServerState.LEADING) {
+ return 0;
+ } else if (s2.getPeerState() == QuorumPeer.ServerState.LEADING) {
+ return 1;
+ }
+ return -1;
+ }
+
+ public int getLeaderClientPort() {
+ if (s1.getPeerState() == QuorumPeer.ServerState.LEADING) {
+ return portClient1;
+ } else if (s2.getPeerState() == QuorumPeer.ServerState.LEADING) {
+ return portClient2;
+ }
+ return -1;
+ }
+
+ public QuorumPeer getLeaderQuorumPeer() {
+ if (s1.getPeerState() == QuorumPeer.ServerState.LEADING) {
+ return s1;
+ } else if (s2.getPeerState() == QuorumPeer.ServerState.LEADING) {
+ return s2;
+ }
+ return null;
+ }
+
+ public QuorumPeer getFirstObserver() {
+ if (s1.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) {
+ return s1;
+ } else if (s2.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) {
+ return s2;
+ }
+ return null;
+ }
+
+ public int getFirstObserverClientPort() {
+ if (s1.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) {
+ return portClient1;
+ } else if (s2.getLearnerType() == QuorumPeer.LearnerType.OBSERVER) {
+ return portClient2;
+ }
+ return -1;
+ }
+
+ public String getPeersMatching(QuorumPeer.ServerState state) {
+ StringBuilder hosts = new StringBuilder();
+ for (QuorumPeer p : getPeerList()) {
+ if (p.getPeerState() == state) {
+ hosts.append(String.format("%s:%d,", LOCALADDR,
p.getClientAddress().getPort()));
+ }
+ }
+ LOG.info("getPeersMatching ports are {}", hosts);
+ return hosts.toString();
+ }
+
+ public ArrayList<QuorumPeer> getPeerList() {
+ ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
+ peers.add(s1);
+ peers.add(s2);
+ return peers;
+ }
+
+ public QuorumPeer getPeerByClientPort(int clientPort) {
+ for (QuorumPeer p : getPeerList()) {
+ if (p.getClientAddress().getPort() == clientPort) {
+ return p;
+ }
+ }
+ return null;
+ }
+
+ public void setupServers() throws IOException {
+ setupServer(1);
+ setupServer(2);
+ }
+
+ Map<Long, QuorumPeer.QuorumServer> peers = null;
+
+ public void setupServer(int i) throws IOException {
+ int tickTime = 2000;
+ int initLimit = 3;
+ int syncLimit = 3;
+ int connectToLearnerMasterLimit = 3;
+
+ if (peers == null) {
+ peers = new HashMap<Long, QuorumPeer.QuorumServer>();
+
+ peers.put(Long.valueOf(1), new QuorumPeer.QuorumServer(1, new
InetSocketAddress(LOCALADDR, port1), new InetSocketAddress(LOCALADDR, portLE1),
new InetSocketAddress(LOCALADDR, portClient1),
QuorumPeer.LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(2), new QuorumPeer.QuorumServer(2, new
InetSocketAddress(LOCALADDR, port2), new InetSocketAddress(LOCALADDR, portLE2),
new InetSocketAddress(LOCALADDR, portClient2),
QuorumPeer.LearnerType.PARTICIPANT));
+ }
+
+ switch (i) {
+ case 1:
+ LOG.info("creating QuorumPeer 1 port {}", portClient1);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient1, s1.getClientPort());
+ break;
+ case 2:
+ LOG.info("creating QuorumPeer 2 port {}", portClient2);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2,
tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
+ assertEquals(portClient2, s2.getClientPort());
+ break;
+ }
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() throws Exception {
+ LOG.info("TearDown started");
+
+ OSMXBean osMbean = new OSMXBean();
+ if (osMbean.getUnix()) {
+ LOG.info("fdcount after test is: {}",
osMbean.getOpenFileDescriptorCount());
+ }
+
+ shutdownServers();
+
+ for (String hp : hostPort.split(",")) {
+ assertTrue(ClientBase.waitForServerDown(hp,
ClientBase.CONNECTION_TIMEOUT), "waiting for server down");
+ LOG.info("{} is no longer accepting client connections", hp);
+ }
+
+ JMXEnv.tearDown();
+ }
+ public void shutdownServers() {
+ shutdown(s1);
+ shutdown(s2);
+ }
+
+ public static void shutdown(QuorumPeer qp) {
+ if (qp == null) {
+ return;
+ }
+ try {
+ LOG.info("Shutting down quorum peer {}", qp.getName());
+ qp.shutdown();
+ Election e = qp.getElectionAlg();
+ if (e != null) {
+ LOG.info("Shutting down leader election {}", qp.getName());
+ e.shutdown();
+ } else {
+ LOG.info("No election available to shutdown {}", qp.getName());
+ }
+ LOG.info("Waiting for {} to exit thread", qp.getName());
+ long readTimeout = qp.getTickTime() * qp.getInitLimit();
+ long connectTimeout = qp.getTickTime() * qp.getSyncLimit();
+ long maxTimeout = Math.max(readTimeout, connectTimeout);
+ maxTimeout = Math.max(maxTimeout, ClientBase.CONNECTION_TIMEOUT);
+ qp.join(maxTimeout * 2);
+ if (qp.isAlive()) {
+ fail("QP failed to shutdown in " + (maxTimeout * 2) + "
seconds: " + qp.getName());
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("QP interrupted: {}", qp.getName(), e);
+ }
+ }
+
+ protected TestableZooKeeper createClient() throws IOException,
InterruptedException {
+ return createClient(hostPort);
+ }
+
+ protected TestableZooKeeper createClient(String hp) throws IOException,
InterruptedException {
+ ClientBase.CountdownWatcher watcher = new
ClientBase.CountdownWatcher();
+ return createClient(watcher, hp);
+ }
+
+ protected TestableZooKeeper createClient(ClientBase.CountdownWatcher
watcher, QuorumPeer.ServerState state) throws IOException, InterruptedException
{
+ return createClient(watcher, getPeersMatching(state));
+ }
+
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
index deaeb68..4761596 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
@@ -57,7 +57,7 @@ public class QuorumMajorityTest extends QuorumBase {
}
//setup servers 1-5 to be followers
- setUp(false);
+ setUp(false, true);
Proposal p = new Proposal();
@@ -77,7 +77,7 @@ public class QuorumMajorityTest extends QuorumBase {
assertEquals(true, p.hasAllQuorums());
//setup servers 1-3 to be followers and 4 and 5 to be observers
- setUp(true);
+ setUp(true, true);
p = new Proposal();
p.addQuorumVerifier(s1.getQuorumVerifier());
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumOracleMajTest.java
similarity index 61%
copy from
zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
copy to
zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumOracleMajTest.java
index deaeb68..1b1fb31 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumMajorityTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumOracleMajTest.java
@@ -15,21 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.zookeeper.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class QuorumMajorityTest extends QuorumBase {
+
+public class QuorumOracleMajTest extends QuorumBaseOracle_2Nodes {
protected static final Logger LOG =
LoggerFactory.getLogger(QuorumMajorityTest.class);
public static final long CONNECTION_TIMEOUT =
ClientTest.CONNECTION_TIMEOUT;
@@ -47,57 +47,75 @@ public class QuorumMajorityTest extends QuorumBase {
QuorumPeer qp = peers.get(i - 1);
Long electionTimeTaken = -1L;
String bean = "";
- if (qp.getPeerState() == ServerState.FOLLOWING) {
+ if (qp.getPeerState() == QuorumPeer.ServerState.FOLLOWING) {
bean =
String.format("%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Follower",
MBeanRegistry.DOMAIN, i, i);
- } else if (qp.getPeerState() == ServerState.LEADING) {
+ } else if (qp.getPeerState() == QuorumPeer.ServerState.LEADING) {
bean =
String.format("%s:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Leader",
MBeanRegistry.DOMAIN, i, i);
}
electionTimeTaken = (Long) JMXEnv.ensureBeanAttribute(bean,
"ElectionTimeTaken");
assertTrue(electionTimeTaken >= 0, "Wrong electionTimeTaken
value!");
}
- //setup servers 1-5 to be followers
- setUp(false);
+ //setup servers 1-2 to be followers
+ // id=1, oracle is false; id=2, oracle is true
+ setUp();
- Proposal p = new Proposal();
+ QuorumPeer s;
+ int leader;
+ if ((leader = getLeaderIndex()) == 1) {
+ s = s1;
+ } else {
+ s = s2;
+ }
- p.addQuorumVerifier(s1.getQuorumVerifier());
+ noDropConectionTest(s);
- // 2 followers out of 5 is not a majority
- p.addAck(Long.valueOf(1));
- p.addAck(Long.valueOf(2));
- assertEquals(false, p.hasAllQuorums());
+ dropConnectionTest(s, leader);
- // 6 is not in the view - its vote shouldn't count
- p.addAck(Long.valueOf(6));
- assertEquals(false, p.hasAllQuorums());
+ }
- // 3 followers out of 5 are a majority of the voting view
- p.addAck(Long.valueOf(3));
- assertEquals(true, p.hasAllQuorums());
+ private void noDropConectionTest(QuorumPeer s) {
+ Leader.Proposal p = new Leader.Proposal();
- //setup servers 1-3 to be followers and 4 and 5 to be observers
- setUp(true);
- p = new Proposal();
- p.addQuorumVerifier(s1.getQuorumVerifier());
+ p.addQuorumVerifier(s.getQuorumVerifier());
- // 1 follower out of 3 is not a majority
+ // 1 followers out of 2 is not a majority
p.addAck(Long.valueOf(1));
assertEquals(false, p.hasAllQuorums());
- // 4 and 5 are observers, their vote shouldn't count
- p.addAck(Long.valueOf(4));
- p.addAck(Long.valueOf(5));
- assertEquals(false, p.hasAllQuorums());
-
// 6 is not in the view - its vote shouldn't count
p.addAck(Long.valueOf(6));
assertEquals(false, p.hasAllQuorums());
- // 2 followers out of 3 are a majority of the voting view
+ // 2 followers out of 2 is good
p.addAck(Long.valueOf(2));
assertEquals(true, p.hasAllQuorums());
+
}
+
+ private void dropConnectionTest(QuorumPeer s, int leader) {
+ Leader.Proposal p = new Leader.Proposal();
+ p.addQuorumVerifier(s.getQuorumVerifier());
+
+ ArrayList<LearnerHandler> fake = new ArrayList<>();
+
+ LearnerHandler f = null;
+ fake.add(f);
+
+ s.getQuorumVerifier().updateNeedOracle(fake);
+ // still have valid followers, the oracle should not take place
+ assertEquals(false, s.getQuorumVerifier().getNeedOracle());
+
+ fake.remove(0);
+ s.getQuorumVerifier().updateNeedOracle(fake);
+ // lose all of followers, the oracle should take place
+ assertEquals(true, s.getQuorumVerifier().getNeedOracle());
+
+
+ // when leader is 1, we expect false.
+ // when leader is 2, we expect true.
+ assertEquals(leader != 1, p.hasAllQuorums());
+ }
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
index b3e1d47..52d2bd8 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
@@ -35,7 +35,7 @@ public class ThrottledOpObserverTest extends QuorumBase {
@BeforeEach
@Override
public void setUp() throws Exception {
- super.setUp(true /* withObservers */);
+ super.setUp(true /* withObservers */, false);
}
@Test
diff --git
a/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f
b/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f
index 26dc5f6..1b563b6 100644
Binary files
a/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f
and
b/zookeeper-server/src/test/resources/data/invalidsnap/version-2/snapshot.83f
differ