Author: suresh
Date: Wed Jan 25 17:26:20 2012
New Revision: 1235841
URL: http://svn.apache.org/viewvc?rev=1235841&view=rev
Log:
HADOOP-7992. Add ZKClient library to facilitate leader election. Contributed by
Bikas Saha.
Added:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/pom.xml
Modified:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt?rev=1235841&r1=1235840&r2=1235841&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
Wed Jan 25 17:26:20 2012
@@ -9,21 +9,21 @@ HADOOP-7455. HA: Introduce HA Service Pr
HADOOP-7774. HA: Administrative CLI to control HA daemons. (todd)
HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing
- back and forth several times with sleeps. (atm)
+back and forth several times with sleeps. (atm)
HADOOP-7922. Improve some logging for client IPC failovers and
- StandbyExceptions (todd)
+StandbyExceptions (todd)
HADOOP-7921. StandbyException should extend IOException (todd)
HADOOP-7928. HA: Client failover policy is incorrectly trying to fail over all
- IOExceptions (atm)
+IOExceptions (atm)
HADOOP-7925. Add interface and update CLI to query current state to
- HAServiceProtocol (eli via todd)
+HAServiceProtocol (eli via todd)
HADOOP-7932. Make client connection retries on socket time outs configurable.
- (Uma Maheswara Rao G via todd)
+(Uma Maheswara Rao G via todd)
HADOOP-7924. â©FailoverController for client-based configuration (eli)
@@ -31,3 +31,6 @@ HADOOP-7961. Move HA fencing to common.
HADOOP-7970. HAServiceProtocol methods must throw IOException.
(Hari Mankude via suresh).
+
+HADOOP-7992. Add ZKClient library to facilitate leader election.
+(Bikas Saha via suresh).
Modified:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/pom.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/pom.xml?rev=1235841&r1=1235840&r2=1235841&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/pom.xml
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/pom.xml
Wed Jan 25 17:26:20 2012
@@ -268,6 +268,34 @@
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.2</version>
+ <exclusions>
+ <exclusion>
+ <!-- otherwise seems to drag in junit 3.8.1 via jline -->
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.2</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Added:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1235841&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
Wed Jan 25 17:26:20 2012
@@ -0,0 +1,593 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.AsyncCallback.*;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.KeeperException.Code;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ *
+ * This class implements a simple library to perform leader election on top of
+ * Apache Zookeeper. Using Zookeeper as a coordination service, leader election
+ * can be performed by atomically creating an ephemeral lock file (znode) on
+ * Zookeeper. The service instance that successfully creates the znode becomes
+ * active and the rest become standbys. <br/>
+ * This election mechanism is only efficient for small number of election
+ * candidates (order of 10's) because contention on single znode by a large
+ * number of candidates can result in Zookeeper overload. <br/>
+ * The elector does not guarantee fencing (protection of shared resources)
among
+ * service instances. After it has notified an instance about becoming a
leader,
+ * then that instance must ensure that it meets the service consistency
+ * requirements. If it cannot do so, then it is recommended to quit the
+ * election. The application implements the {@link
ActiveStandbyElectorCallback}
+ * to interact with the elector
+ */
[email protected]
[email protected]
+public class ActiveStandbyElector implements Watcher, StringCallback,
+ StatCallback {
+
+ /**
+ * Callback interface to interact with the ActiveStandbyElector object. <br/>
+ * The application will be notified with a callback only on state changes
+ * (i.e. there will never be successive calls to becomeActive without an
+ * intermediate call to enterNeutralMode). <br/>
+ * The callbacks will be running on Zookeeper client library threads. The
+ * application should return from these callbacks quickly so as not to impede
+ * Zookeeper client library performance and notifications. The app will
+ * typically remember the state change and return from the callback. It will
+ * then proceed with implementing actions around that state change. It is
+ * possible to be called back again while these actions are in flight and the
+ * app should handle this scenario.
+ */
+ public interface ActiveStandbyElectorCallback {
+ /**
+ * This method is called when the app becomes the active leader
+ */
+ void becomeActive();
+
+ /**
+ * This method is called when the app becomes a standby
+ */
+ void becomeStandby();
+
+ /**
+ * If the elector gets disconnected from Zookeeper and does not know about
+ * the lock state, then it will notify the service via the enterNeutralMode
+ * interface. The service may choose to ignore this or stop doing state
+ * changing operations. Upon reconnection, the elector verifies the leader
+ * status and calls back on the becomeActive and becomeStandby app
+ * interfaces. <br/>
+ * Zookeeper disconnects can happen due to network issues or loss of
+ * Zookeeper quorum. Thus enterNeutralMode can be used to guard against
+ * split-brain issues. In such situations it might be prudent to call
+ * becomeStandby too. However, such state change operations might be
+ * expensive and enterNeutralMode can help guard against doing that for
+ * transient issues.
+ */
+ void enterNeutralMode();
+
+ /**
+ * If there is any fatal error (e.g. wrong ACL's, unexpected Zookeeper
+ * errors or Zookeeper persistent unavailability) then notifyFatalError is
+ * called to notify the app about it.
+ */
+ void notifyFatalError(String errorMessage);
+ }
+
+ /**
+ * Name of the lock znode used by the library. Protected for access in test
+ * classes
+ */
+ @VisibleForTesting
+ protected static final String LOCKFILENAME = "ActiveStandbyElectorLock";
+
+ public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
+
+ private static final int NUM_RETRIES = 3;
+
+ private enum ConnectionState {
+ DISCONNECTED, CONNECTED, TERMINATED
+ };
+
+ private enum State {
+ INIT, ACTIVE, STANDBY, NEUTRAL
+ };
+
+ private State state = State.INIT;
+ private int createRetryCount = 0;
+ private int statRetryCount = 0;
+ private ZooKeeper zkClient;
+ private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
+
+ private final ActiveStandbyElectorCallback appClient;
+ private final String zkHostPort;
+ private final int zkSessionTimeout;
+ private final List<ACL> zkAcl;
+ private byte[] appData;
+ private final String zkLockFilePath;
+ private final String znodeWorkingDir;
+
+ /**
+ * Create a new ActiveStandbyElector object <br/>
+ * The elector is created by providing to it the Zookeeper configuration, the
+ * parent znode under which to create the znode and a reference to the
+ * callback interface. <br/>
+ * The parent znode name must be the same for all service instances and
+ * different across services. <br/>
+ * After the leader has been lost, a new leader will be elected after the
+ * session timeout expires. Hence, the app must set this parameter based on
+ * its needs for failure response time. The session timeout must be greater
+ * than the Zookeeper disconnect timeout and is recommended to be 3X that
+ * value to enable Zookeeper to retry transient disconnections. Setting a
very
+ * short session timeout may result in frequent transitions between active
and
+ * standby states during issues like network outages/GS pauses.
+ *
+ * @param zookeeperHostPorts
+ * ZooKeeper hostPort for all ZooKeeper servers
+ * @param zookeeperSessionTimeout
+ * ZooKeeper session timeout
+ * @param parentZnodeName
+ * znode under which to create the lock
+ * @param acl
+ * ZooKeeper ACL's
+ * @param app
+ * reference to callback interface object
+ * @throws IOException
+ * @throws HadoopIllegalArgumentException
+ */
+ public ActiveStandbyElector(String zookeeperHostPorts,
+ int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
+ ActiveStandbyElectorCallback app) throws IOException,
+ HadoopIllegalArgumentException {
+ if (app == null || acl == null || parentZnodeName == null
+ || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
+ throw new HadoopIllegalArgumentException("Invalid argument");
+ }
+ zkHostPort = zookeeperHostPorts;
+ zkSessionTimeout = zookeeperSessionTimeout;
+ zkAcl = acl;
+ appClient = app;
+ znodeWorkingDir = parentZnodeName;
+ zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME;
+
+ // createConnection for future API calls
+ createConnection();
+ }
+
+ /**
+ * To participate in election, the app will call joinElection. The result
will
+ * be notified by a callback on either the becomeActive or becomeStandby app
+ * interfaces. <br/>
+ * After this the elector will automatically monitor the leader status and
+ * perform re-election if necessary<br/>
+ * The app could potentially start off in standby mode and ignore the
+ * becomeStandby call.
+ *
+ * @param data
+ * to be set by the app. non-null data must be set.
+ * @throws HadoopIllegalArgumentException
+ * if valid data is not supplied
+ */
+ public synchronized void joinElection(byte[] data)
+ throws HadoopIllegalArgumentException {
+ LOG.debug("Attempting active election");
+
+ if (data == null) {
+ throw new HadoopIllegalArgumentException("data cannot be null");
+ }
+
+ appData = new byte[data.length];
+ System.arraycopy(data, 0, appData, 0, data.length);
+
+ joinElectionInternal();
+ }
+
+ /**
+ * Any service instance can drop out of the election by calling
quitElection.
+ * <br/>
+ * This will lose any leader status, if held, and stop monitoring of the lock
+ * node. <br/>
+ * If the instance wants to participate in election again, then it needs to
+ * call joinElection(). <br/>
+ * This allows service instances to take themselves out of rotation for known
+ * impending unavailable states (e.g. long GC pause or software upgrade).
+ */
+ public synchronized void quitElection() {
+ LOG.debug("Yielding from election");
+ reset();
+ }
+
+ /**
+ * Exception thrown when there is no active leader
+ */
+ public class ActiveNotFoundException extends Exception {
+ private static final long serialVersionUID = 3505396722342846462L;
+ }
+
+ /**
+ * get data set by the active leader
+ *
+ * @return data set by the active instance
+ * @throws ActiveNotFoundException
+ * when there is no active leader
+ * @throws KeeperException
+ * other zookeeper operation errors
+ * @throws InterruptedException
+ * @throws IOException
+ * when ZooKeeper connection could not be established
+ */
+ public synchronized byte[] getActiveData() throws ActiveNotFoundException,
+ KeeperException, InterruptedException, IOException {
+ try {
+ if (zkClient == null) {
+ createConnection();
+ }
+ Stat stat = new Stat();
+ return zkClient.getData(zkLockFilePath, false, stat);
+ } catch(KeeperException e) {
+ Code code = e.code();
+ if (operationNodeDoesNotExist(code)) {
+ // handle the commonly expected cases that make sense for us
+ throw new ActiveNotFoundException();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * interface implementation of Zookeeper callback for create
+ */
+ @Override
+ public synchronized void processResult(int rc, String path, Object ctx,
+ String name) {
+ LOG.debug("CreateNode result: " + rc + " for path: " + path
+ + " connectionState: " + zkConnectionState);
+ if (zkClient == null) {
+ // zkClient is nulled before closing the connection
+ // this is the callback with session expired after we closed the session
+ return;
+ }
+
+ Code code = Code.get(rc);
+ if (operationSuccess(code)) {
+ // we successfully created the znode. we are the leader. start monitoring
+ becomeActive();
+ monitorActiveStatus();
+ return;
+ }
+
+ if (operationNodeExists(code)) {
+ if (createRetryCount == 0) {
+ // znode exists and we did not retry the operation. so a different
+ // instance has created it. become standby and monitor lock.
+ becomeStandby();
+ }
+ // if we had retried then the znode could have been created by our first
+ // attempt to the server (that we lost) and this node exists response is
+ // for the second attempt. verify this case via ephemeral node owner.
this
+ // will happen on the callback for monitoring the lock.
+ monitorActiveStatus();
+ return;
+ }
+
+ String errorMessage = "Received create error from Zookeeper. code:"
+ + code.toString();
+ LOG.debug(errorMessage);
+
+ if (operationRetry(code)) {
+ if (createRetryCount < NUM_RETRIES) {
+ LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
+ ++createRetryCount;
+ createNode();
+ return;
+ }
+ errorMessage = errorMessage
+ + ". Not retrying further znode create connection errors.";
+ }
+
+ fatalError(errorMessage);
+ }
+
+ /**
+ * interface implementation of Zookeeper callback for monitor (exists)
+ */
+ @Override
+ public synchronized void processResult(int rc, String path, Object ctx,
+ Stat stat) {
+ LOG.debug("StatNode result: " + rc + " for path: " + path
+ + " connectionState: " + zkConnectionState);
+ if (zkClient == null) {
+ // zkClient is nulled before closing the connection
+ // this is the callback with session expired after we closed the session
+ return;
+ }
+
+ Code code = Code.get(rc);
+ if (operationSuccess(code)) {
+ // the following owner check completes verification in case the lock
znode
+ // creation was retried
+ if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
+ // we own the lock znode. so we are the leader
+ becomeActive();
+ } else {
+ // we dont own the lock znode. so we are a standby.
+ becomeStandby();
+ }
+ // the watch set by us will notify about changes
+ return;
+ }
+
+ if (operationNodeDoesNotExist(code)) {
+ // the lock znode disappeared before we started monitoring it
+ enterNeutralMode();
+ joinElectionInternal();
+ return;
+ }
+
+ String errorMessage = "Received stat error from Zookeeper. code:"
+ + code.toString();
+ LOG.debug(errorMessage);
+
+ if (operationRetry(code)) {
+ if (statRetryCount < NUM_RETRIES) {
+ ++statRetryCount;
+ monitorNode();
+ return;
+ }
+ errorMessage = errorMessage
+ + ". Not retrying further znode monitoring connection errors.";
+ }
+
+ fatalError(errorMessage);
+ }
+
+ /**
+ * interface implementation of Zookeeper watch events (connection and node)
+ */
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ Event.EventType eventType = event.getType();
+ LOG.debug("Watcher event type: " + eventType + " with state:"
+ + event.getState() + " for path:" + event.getPath()
+ + " connectionState: " + zkConnectionState);
+ if (zkClient == null) {
+ // zkClient is nulled before closing the connection
+ // this is the callback with session expired after we closed the session
+ return;
+ }
+
+ if (eventType == Event.EventType.None) {
+ // the connection state has changed
+ switch (event.getState()) {
+ case SyncConnected:
+ // if the listener was asked to move to safe state then it needs to
+ // be undone
+ ConnectionState prevConnectionState = zkConnectionState;
+ zkConnectionState = ConnectionState.CONNECTED;
+ if (prevConnectionState == ConnectionState.DISCONNECTED) {
+ monitorActiveStatus();
+ }
+ break;
+ case Disconnected:
+ // ask the app to move to safe state because zookeeper connection
+ // is not active and we dont know our state
+ zkConnectionState = ConnectionState.DISCONNECTED;
+ enterNeutralMode();
+ break;
+ case Expired:
+ // the connection got terminated because of session timeout
+ // call listener to reconnect
+ enterNeutralMode();
+ reJoinElection();
+ break;
+ default:
+ fatalError("Unexpected Zookeeper watch event state: "
+ + event.getState());
+ break;
+ }
+
+ return;
+ }
+
+ // a watch on lock path in zookeeper has fired. so something has changed on
+ // the lock. ideally we should check that the path is the same as the lock
+ // path but trusting zookeeper for now
+ String path = event.getPath();
+ if (path != null) {
+ switch (eventType) {
+ case NodeDeleted:
+ if (state == State.ACTIVE) {
+ enterNeutralMode();
+ }
+ joinElectionInternal();
+ break;
+ case NodeDataChanged:
+ monitorActiveStatus();
+ break;
+ default:
+ LOG.debug("Unexpected node event: " + eventType + " for path: " +
path);
+ monitorActiveStatus();
+ }
+
+ return;
+ }
+
+ // some unexpected error has occurred
+ fatalError("Unexpected watch error from Zookeeper");
+ }
+
+ /**
+ * Get a new zookeeper client instance. protected so that test class can
+ * inherit and pass in a mock object for zookeeper
+ *
+ * @return new zookeeper client instance
+ * @throws IOException
+ */
+ protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
+ return new ZooKeeper(zkHostPort, zkSessionTimeout, this);
+ }
+
+ private void fatalError(String errorMessage) {
+ reset();
+ appClient.notifyFatalError(errorMessage);
+ }
+
+ private void monitorActiveStatus() {
+ LOG.debug("Monitoring active leader");
+ statRetryCount = 0;
+ monitorNode();
+ }
+
+ private void joinElectionInternal() {
+ if (zkClient == null) {
+ if (!reEstablishSession()) {
+ fatalError("Failed to reEstablish connection with ZooKeeper");
+ return;
+ }
+ }
+
+ createRetryCount = 0;
+ createNode();
+ }
+
+ private void reJoinElection() {
+ LOG.debug("Trying to re-establish ZK session");
+ terminateConnection();
+ joinElectionInternal();
+ }
+
+ private boolean reEstablishSession() {
+ int connectionRetryCount = 0;
+ boolean success = false;
+ while(!success && connectionRetryCount < NUM_RETRIES) {
+ LOG.debug("Establishing zookeeper connection");
+ try {
+ createConnection();
+ success = true;
+ } catch(IOException e) {
+ LOG.warn(e);
+ try {
+ Thread.sleep(5000);
+ } catch(InterruptedException e1) {
+ LOG.warn(e1);
+ }
+ }
+ ++connectionRetryCount;
+ }
+ return success;
+ }
+
+ private void createConnection() throws IOException {
+ zkClient = getNewZooKeeper();
+ }
+
+ private void terminateConnection() {
+ if (zkClient == null) {
+ return;
+ }
+ LOG.debug("Terminating ZK connection");
+ ZooKeeper tempZk = zkClient;
+ zkClient = null;
+ try {
+ tempZk.close();
+ } catch(InterruptedException e) {
+ LOG.warn(e);
+ }
+ zkConnectionState = ConnectionState.TERMINATED;
+ }
+
+ private void reset() {
+ state = State.INIT;
+ terminateConnection();
+ }
+
+ private void becomeActive() {
+ if (state != State.ACTIVE) {
+ LOG.debug("Becoming active");
+ state = State.ACTIVE;
+ appClient.becomeActive();
+ }
+ }
+
+ private void becomeStandby() {
+ if (state != State.STANDBY) {
+ LOG.debug("Becoming standby");
+ state = State.STANDBY;
+ appClient.becomeStandby();
+ }
+ }
+
+ private void enterNeutralMode() {
+ if (state != State.NEUTRAL) {
+ LOG.debug("Entering neutral mode");
+ state = State.NEUTRAL;
+ appClient.enterNeutralMode();
+ }
+ }
+
+ private void createNode() {
+ zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this,
+ null);
+ }
+
+ private void monitorNode() {
+ zkClient.exists(zkLockFilePath, true, this, null);
+ }
+
+ private boolean operationSuccess(Code code) {
+ return (code == Code.OK);
+ }
+
+ private boolean operationNodeExists(Code code) {
+ return (code == Code.NODEEXISTS);
+ }
+
+ private boolean operationNodeDoesNotExist(Code code) {
+ return (code == Code.NONODE);
+ }
+
+ private boolean operationRetry(Code code) {
+ switch (code) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ return true;
+ }
+ return false;
+ }
+
+}
Added:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1235841&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
Wed Jan 25 17:26:20 2012
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
+
+public class TestActiveStandbyElector {
+
+ static ZooKeeper mockZK;
+ static int count;
+ static ActiveStandbyElectorCallback mockApp;
+ static final byte[] data = new byte[8];
+
+ ActiveStandbyElectorTester elector;
+
+ class ActiveStandbyElectorTester extends ActiveStandbyElector {
+ ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
+ List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
+ super(hostPort, timeout, parent, acl, app);
+ }
+
+ @Override
+ public ZooKeeper getNewZooKeeper() {
+ ++TestActiveStandbyElector.count;
+ return TestActiveStandbyElector.mockZK;
+ }
+
+ }
+
+ private static final String zkParentName = "/zookeeper";
+ private static final String zkLockPathName = "/zookeeper/"
+ + ActiveStandbyElector.LOCKFILENAME;
+
+ @Before
+ public void init() throws IOException {
+ count = 0;
+ mockZK = Mockito.mock(ZooKeeper.class);
+ mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
+ elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName,
+ Ids.OPEN_ACL_UNSAFE, mockApp);
+ }
+
+ /**
+ * verify that joinElection checks for null data
+ */
+ @Test(expected = HadoopIllegalArgumentException.class)
+ public void testJoinElectionException() {
+ elector.joinElection(null);
+ }
+
+ /**
+ * verify that joinElection tries to create ephemeral lock znode
+ */
+ @Test
+ public void testJoinElection() {
+ elector.joinElection(data);
+ Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ }
+
+ /**
+ * verify that successful znode create result becomes active and monitoring
is
+ * started
+ */
+ @Test
+ public void testCreateNodeResultBecomeActive() {
+ elector.joinElection(data);
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ // monitor callback verifies the leader is ephemeral owner of lock but does
+ // not call becomeActive since its already active
+ Stat stat = new Stat();
+ stat.setEphemeralOwner(1L);
+ Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+ // should not call neutral mode/standby/active
+ Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
+ Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+ // another joinElection not called.
+ Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ // no new monitor called
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+ }
+
+ /**
+ * verify that znode create for existing node and no retry becomes standby
and
+ * monitoring is started
+ */
+ @Test
+ public void testCreateNodeResultBecomeStandby() {
+ elector.joinElection(data);
+
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+ }
+
+ /**
+ * verify that znode create error result in fatal error
+ */
+ @Test
+ public void testCreateNodeResultError() {
+ elector.joinElection(data);
+
+ elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
+ "Received create error from Zookeeper. code:APIERROR");
+ }
+
+ /**
+ * verify that retry of network errors verifies master by session id and
+ * becomes active if they match. monitoring is started.
+ */
+ @Test
+ public void testCreateNodeResultRetryBecomeActive() {
+ elector.joinElection(data);
+
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ // 4 errors results in fatalError
+ Mockito
+ .verify(mockApp, Mockito.times(1))
+ .notifyFatalError(
+ "Received create error from Zookeeper. code:CONNECTIONLOSS. "+
+ "Not retrying further znode create connection errors.");
+
+ elector.joinElection(data);
+ // recreate connection via getNewZooKeeper
+ Assert.assertEquals(2, TestActiveStandbyElector.count);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ Stat stat = new Stat();
+ stat.setEphemeralOwner(1L);
+ Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+ Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ }
+
+ /**
+ * verify that retry of network errors verifies active by session id and
+ * becomes standby if they dont match. monitoring is started.
+ */
+ @Test
+ public void testCreateNodeResultRetryBecomeStandby() {
+ elector.joinElection(data);
+
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ Stat stat = new Stat();
+ stat.setEphemeralOwner(0);
+ Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+ }
+
+ /**
+ * verify that if create znode results in nodeexists and that znode is
deleted
+ * before exists() watch is set then the return of the exists() method
results
+ * in attempt to re-create the znode and become active
+ */
+ @Test
+ public void testCreateNodeResultRetryNoNode() {
+ elector.joinElection(data);
+
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ elector.processResult(Code.NONODE.intValue(), zkLockPathName, null,
+ (Stat) null);
+ Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
+ Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ }
+
+ /**
+ * verify that more than 3 network error retries result fatalError
+ */
+ @Test
+ public void testStatNodeRetry() {
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ (Stat) null);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ (Stat) null);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ (Stat) null);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ (Stat) null);
+ Mockito
+ .verify(mockApp, Mockito.times(1))
+ .notifyFatalError(
+ "Received stat error from Zookeeper. code:CONNECTIONLOSS. "+
+ "Not retrying further znode monitoring connection errors.");
+ }
+
+ /**
+ * verify error in exists() callback results in fatal error
+ */
+ @Test
+ public void testStatNodeError() {
+ elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName,
+ null, (Stat) null);
+ Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
+ Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
+ "Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY");
+ }
+
+ /**
+ * verify behavior of watcher.process callback with non-node event
+ */
+ @Test
+ public void testProcessCallbackEventNone() {
+ elector.joinElection(data);
+
+ WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
+ Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None);
+
+ // first SyncConnected should not do anything
+ Mockito.when(mockEvent.getState()).thenReturn(
+ Event.KeeperState.SyncConnected);
+ elector.process(mockEvent);
+ Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(),
+ Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(),
+ Mockito.<Object> anyObject());
+
+ // disconnection should enter safe mode
+ Mockito.when(mockEvent.getState()).thenReturn(
+ Event.KeeperState.Disconnected);
+ elector.process(mockEvent);
+ Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
+
+ // re-connection should monitor master status
+ Mockito.when(mockEvent.getState()).thenReturn(
+ Event.KeeperState.SyncConnected);
+ elector.process(mockEvent);
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ // session expired should enter safe mode and initiate re-election
+ // re-election checked via checking re-creation of new zookeeper and
+ // call to create lock znode
+ Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
+ elector.process(mockEvent);
+ // already in safe mode above. should not enter safe mode again
+ Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
+ // called getNewZooKeeper to create new session. first call was in
+ // constructor
+ Assert.assertEquals(2, TestActiveStandbyElector.count);
+ // once in initial joinElection and one now
+ Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+
+ // create znode success. become master and monitor
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+ Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
+ elector, null);
+
+ // error event results in fatal error
+
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
+ elector.process(mockEvent);
+ Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
+ "Unexpected Zookeeper watch event state: AuthFailed");
+ // only 1 state change callback is called at a time
+ Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
+ }
+
+ /**
+ * verify behavior of watcher.process with node event
+ */
+ @Test
+ public void testProcessCallbackEventNode() {
+ elector.joinElection(data);
+
+ // make the object go into the monitoring state
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
+ Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
+
+ // monitoring should be setup again after event is received
+ Mockito.when(mockEvent.getType()).thenReturn(
+ Event.EventType.NodeDataChanged);
+ elector.process(mockEvent);
+ Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
+ elector, null);
+
+ // monitoring should be setup again after event is received
+ Mockito.when(mockEvent.getType()).thenReturn(
+ Event.EventType.NodeChildrenChanged);
+ elector.process(mockEvent);
+ Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true,
+ elector, null);
+
+ // lock node deletion when in standby mode should create znode again
+ // successful znode creation enters active state and sets monitor
+ Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
+ elector.process(mockEvent);
+ // enterNeutralMode not called when app is standby and leader is lost
+ Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
+ // once in initial joinElection() and one now
+ Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+ Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true,
+ elector, null);
+
+ // lock node deletion in active mode should enter neutral mode and create
+ // znode again successful znode creation enters active state and sets
+ // monitor
+ Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
+ elector.process(mockEvent);
+ Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
+ // another joinElection called
+ Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ elector.processResult(Code.OK.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
+ Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
+ elector, null);
+
+ // bad path name results in fatal error
+ Mockito.when(mockEvent.getPath()).thenReturn(null);
+ elector.process(mockEvent);
+ Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
+ "Unexpected watch error from Zookeeper");
+ // fatal error means no new connection other than one from constructor
+ Assert.assertEquals(1, TestActiveStandbyElector.count);
+ // no new watches after fatal error
+ Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
+ elector, null);
+
+ }
+
+ /**
+ * verify becomeStandby is not called if already in standby
+ */
+ @Test
+ public void testSuccessiveStandbyCalls() {
+ elector.joinElection(data);
+
+ // make the object go into the monitoring standby state
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
+ Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
+
+ // notify node deletion
+ // monitoring should be setup again after event is received
+ Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
+ elector.process(mockEvent);
+ // is standby. no need to notify anything now
+ Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
+ // another joinElection called.
+ Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+ // lost election
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ // still standby. so no need to notify again
+ Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
+ // monitor is set again
+ Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
+ elector, null);
+ }
+
+ /**
+ * verify quit election terminates connection and there are no new watches.
+ * next call to joinElection creates new connection and performs election
+ */
+ @Test
+ public void testQuitElection() throws InterruptedException {
+ elector.quitElection();
+ Mockito.verify(mockZK, Mockito.times(1)).close();
+ // no watches added
+ Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true,
+ elector, null);
+
+ byte[] data = new byte[8];
+ elector.joinElection(data);
+ // getNewZooKeeper called 2 times. once in constructor and once now
+ Assert.assertEquals(2, TestActiveStandbyElector.count);
+ elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
+ zkLockPathName);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
+ Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
+ elector, null);
+
+ }
+
+ /**
+ * verify that receiveActiveData gives data when active exists, tells that
+ * active does not exist and reports error in getting active information
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ * @throws ActiveNotFoundException
+ */
+ @Test
+ public void testGetActiveData() throws ActiveNotFoundException,
+ KeeperException, InterruptedException, IOException {
+ // get valid active data
+ byte[] data = new byte[8];
+ Mockito.when(
+ mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.<Stat> anyObject())).thenReturn(data);
+ Assert.assertEquals(data, elector.getActiveData());
+ Mockito.verify(mockZK, Mockito.times(1)).getData(
+ Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.<Stat> anyObject());
+
+ // active does not exist
+ Mockito.when(
+ mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.<Stat> anyObject())).thenThrow(
+ new KeeperException.NoNodeException());
+ try {
+ elector.getActiveData();
+ Assert.fail("ActiveNotFoundException expected");
+ } catch(ActiveNotFoundException e) {
+ Mockito.verify(mockZK, Mockito.times(2)).getData(
+ Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.<Stat> anyObject());
+ }
+
+ // error getting active data rethrows keeperexception
+ try {
+ Mockito.when(
+ mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.<Stat> anyObject())).thenThrow(
+ new KeeperException.AuthFailedException());
+ elector.getActiveData();
+ Assert.fail("KeeperException.AuthFailedException expected");
+ } catch(KeeperException.AuthFailedException ke) {
+ Mockito.verify(mockZK, Mockito.times(3)).getData(
+ Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.<Stat> anyObject());
+ }
+ }
+
+}
Added:
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java?rev=1235841&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
Wed Jan 25 17:26:20 2012
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.test.ClientBase;
+
+/**
+ * Test for {@link ActiveStandbyElector} using real zookeeper.
+ */
+public class TestActiveStandbyElectorRealZK extends ClientBase {
+ static final int NUM_ELECTORS = 2;
+ static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
+ static int currentClientIndex = 0;
+
+ class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector {
+ ActiveStandbyElectorTesterRealZK(String hostPort, int timeout,
+ String parent, List<ACL> acl, ActiveStandbyElectorCallback app)
+ throws IOException {
+ super(hostPort, timeout, parent, acl, app);
+ }
+
+ @Override
+ public ZooKeeper getNewZooKeeper() {
+ return TestActiveStandbyElectorRealZK.zkClient[
+
TestActiveStandbyElectorRealZK.currentClientIndex];
+ }
+ }
+
+ /**
+ * The class object runs on a thread and waits for a signal to start from
the
+ * test object. On getting the signal it joins the election and thus by
doing
+ * this on multiple threads we can test simultaneous attempts at leader lock
+ * creation. after joining the election, the object waits on a signal to
exit.
+ * this signal comes when the object's elector has become a leader or there
is
+ * an unexpected fatal error. this lets another thread object to become a
+ * leader.
+ */
+ class ThreadRunner implements Runnable, ActiveStandbyElectorCallback {
+ int index;
+ TestActiveStandbyElectorRealZK test;
+ boolean wait = true;
+
+ ThreadRunner(int i, TestActiveStandbyElectorRealZK s) {
+ index = i;
+ test = s;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("starting " + index);
+ while(true) {
+ synchronized (test) {
+ // wait for test start signal to come
+ if (!test.start) {
+ try {
+ test.wait();
+ } catch(InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ // join election
+ byte[] data = new byte[8];
+ ActiveStandbyElector elector = test.elector[index];
+ LOG.info("joining " + index);
+ elector.joinElection(data);
+ try {
+ while(true) {
+ synchronized (this) {
+ // wait for elector to become active/fatal error
+ if (wait) {
+ // wait to become active
+ // wait capped at 30s to prevent hung test
+ wait(30000);
+ } else {
+ break;
+ }
+ }
+ }
+ Thread.sleep(1000);
+ // quit election to allow other elector to become active
+ elector.quitElection();
+ } catch(InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ LOG.info("ending " + index);
+ }
+
+ @Override
+ public synchronized void becomeActive() {
+ test.reportActive(index);
+ LOG.info("active " + index);
+ wait = false;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void becomeStandby() {
+ test.reportStandby(index);
+ LOG.info("standby " + index);
+ }
+
+ @Override
+ public synchronized void enterNeutralMode() {
+ LOG.info("neutral " + index);
+ }
+
+ @Override
+ public synchronized void notifyFatalError(String errorMessage) {
+ LOG.info("fatal " + index + " .Error message:" + errorMessage);
+ wait = false;
+ notifyAll();
+ }
+ }
+
+ boolean start = false;
+ int activeIndex = -1;
+ int standbyIndex = -1;
+ String parentDir = "/" + java.util.UUID.randomUUID().toString();
+
+ ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS];
+ ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS];
+ Thread[] thread = new Thread[NUM_ELECTORS];
+
+ synchronized void reportActive(int index) {
+ if (activeIndex == -1) {
+ activeIndex = index;
+ } else {
+ // standby should become active
+ Assert.assertEquals(standbyIndex, index);
+ // old active should not become active
+ Assert.assertFalse(activeIndex == index);
+ }
+ activeIndex = index;
+ }
+
+ synchronized void reportStandby(int index) {
+ // only 1 standby should be reported and it should not be the same as
active
+ Assert.assertEquals(-1, standbyIndex);
+ standbyIndex = index;
+ Assert.assertFalse(activeIndex == standbyIndex);
+ }
+
+ /**
+ * the test creates 2 electors which try to become active using a real
+ * zookeeper server. It verifies that 1 becomes active and 1 becomes standby.
+ * Upon becoming active the leader quits election and the test verifies that
+ * the standby now becomes active. these electors run on different threads
and
+ * callback to the test class to report active and standby where the outcome
+ * is verified
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ @Test
+ public void testActiveStandbyTransition() throws IOException,
+ InterruptedException, KeeperException {
+ LOG.info("starting test with parentDir:" + parentDir);
+ start = false;
+ byte[] data = new byte[8];
+ // create random working directory
+ createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ for(currentClientIndex = 0;
+ currentClientIndex < NUM_ELECTORS;
+ ++currentClientIndex) {
+ LOG.info("creating " + currentClientIndex);
+ zkClient[currentClientIndex] = createClient();
+ threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex,
+ this);
+ elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK(
+ "hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE,
+ threadRunner[currentClientIndex]);
+ zkClient[currentClientIndex].register(elector[currentClientIndex]);
+ thread[currentClientIndex] = new
Thread(threadRunner[currentClientIndex]);
+ thread[currentClientIndex].start();
+ }
+
+ synchronized (this) {
+ // signal threads to start
+ LOG.info("signaling threads");
+ start = true;
+ notifyAll();
+ }
+
+ for(int i = 0; i < thread.length; i++) {
+ thread[i].join();
+ }
+ }
+}