Author: umamahesh
Date: Wed Apr 23 19:04:33 2014
New Revision: 1589495
URL: http://svn.apache.org/r1589495
Log:
Merge HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is
unstable. Contributed by Vinayakumar B.
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
Wed Apr 23 19:04:33 2014
@@ -78,6 +78,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh
Shah via kihwal)
+ HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is
unstable
+ (Vinayakumar B via umamahesh)
+
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
Wed Apr 23 19:04:33 2014
@@ -74,6 +74,9 @@ public class HealthMonitor {
private List<Callback> callbacks = Collections.synchronizedList(
new LinkedList<Callback>());
+ private List<ServiceStateCallback> serviceStateCallbacks = Collections
+ .synchronizedList(new LinkedList<ServiceStateCallback>());
+
private HAServiceStatus lastServiceState = new HAServiceStatus(
HAServiceState.INITIALIZING);
@@ -134,7 +137,15 @@ public class HealthMonitor {
public void removeCallback(Callback cb) {
callbacks.remove(cb);
}
-
+
+ public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
+ this.serviceStateCallbacks.add(cb);
+ }
+
+ public synchronized void removeServiceStateCallback(ServiceStateCallback cb)
{
+ serviceStateCallbacks.remove(cb);
+ }
+
public void shutdown() {
LOG.info("Stopping HealthMonitor thread");
shouldRun = false;
@@ -217,6 +228,9 @@ public class HealthMonitor {
private synchronized void setLastServiceStatus(HAServiceStatus status) {
this.lastServiceState = status;
+ for (ServiceStateCallback cb : serviceStateCallbacks) {
+ cb.reportServiceStatus(lastServiceState);
+ }
}
private synchronized void enterState(State newState) {
@@ -293,4 +307,11 @@ public class HealthMonitor {
static interface Callback {
void enteredState(State newState);
}
+
+ /**
+ * Callback interface for service states.
+ */
+ static interface ServiceStateCallback {
+ void reportServiceStatus(HAServiceStatus status);
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
Wed Apr 23 19:04:33 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.util.ZKUtil;
@@ -105,6 +106,8 @@ public abstract class ZKFailoverControll
private State lastHealthState = State.INITIALIZING;
+ private volatile HAServiceState serviceState = HAServiceState.INITIALIZING;
+
/** Set if a fatal error occurs */
private String fatalError = null;
@@ -294,6 +297,7 @@ public abstract class ZKFailoverControll
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks());
+ healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start();
}
@@ -376,6 +380,7 @@ public abstract class ZKFailoverControll
String msg = "Successfully transitioned " + localTarget +
" to active state";
LOG.info(msg);
+ serviceState = HAServiceState.ACTIVE;
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
} catch (Throwable t) {
@@ -484,6 +489,7 @@ public abstract class ZKFailoverControll
// TODO handle this. It's a likely case since we probably got fenced
// at the same time.
}
+ serviceState = HAServiceState.STANDBY;
}
@@ -574,6 +580,7 @@ public abstract class ZKFailoverControll
delayJoiningUntilNanotime = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(millisToCede);
elector.quitElection(needFence);
+ serviceState = HAServiceState.INITIALIZING;
}
}
recheckElectability();
@@ -739,12 +746,16 @@ public abstract class ZKFailoverControll
switch (lastHealthState) {
case SERVICE_HEALTHY:
elector.joinElection(targetToData(localTarget));
+ if (quitElectionOnBadState) {
+ quitElectionOnBadState = false;
+ }
break;
case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
elector.quitElection(false);
+ serviceState = HAServiceState.INITIALIZING;
break;
case SERVICE_UNHEALTHY:
@@ -752,6 +763,7 @@ public abstract class ZKFailoverControll
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
elector.quitElection(true);
+ serviceState = HAServiceState.INITIALIZING;
break;
case HEALTH_MONITOR_FAILED:
@@ -784,6 +796,44 @@ public abstract class ZKFailoverControll
whenNanos, TimeUnit.NANOSECONDS);
}
+ int serviceStateMismatchCount = 0;
+ boolean quitElectionOnBadState = false;
+
+ void verifyChangedServiceState(HAServiceState changedState) {
+ synchronized (elector) {
+ synchronized (this) {
+ if (serviceState == HAServiceState.INITIALIZING) {
+ if (quitElectionOnBadState) {
+ LOG.debug("rechecking for electability from bad state");
+ recheckElectability();
+ }
+ return;
+ }
+ if (changedState == serviceState) {
+ serviceStateMismatchCount = 0;
+ return;
+ }
+ if (serviceStateMismatchCount == 0) {
+ // recheck one more time. As this might be due to parallel
transition.
+ serviceStateMismatchCount++;
+ return;
+ }
+ // quit the election as the expected state and reported state
+ // mismatches.
+ LOG.error("Local service " + localTarget
+ + " has changed the serviceState to " + changedState
+ + ". Expected was " + serviceState
+ + ". Quitting election marking fencing necessary.");
+ delayJoiningUntilNanotime = System.nanoTime()
+ + TimeUnit.MILLISECONDS.toNanos(1000);
+ elector.quitElection(true);
+ quitElectionOnBadState = true;
+ serviceStateMismatchCount = 0;
+ serviceState = HAServiceState.INITIALIZING;
+ }
+ }
+ }
+
/**
* @return the last health state passed to the FC
* by the HealthMonitor.
@@ -855,7 +905,17 @@ public abstract class ZKFailoverControll
recheckElectability();
}
}
-
+
+ /**
+ * Callbacks for HAServiceStatus
+ */
+ class ServiceStateCallBacks implements HealthMonitor.ServiceStateCallback {
+ @Override
+ public void reportServiceStatus(HAServiceStatus status) {
+ verifyChangedServiceState(status.getState());
+ }
+ }
+
private static class ActiveAttemptRecord {
private final boolean succeeded;
private final String status;
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
Wed Apr 23 19:04:33 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.ha;
import static org.junit.Assert.*;
-import static org.junit.Assume.assumeTrue;
import java.security.NoSuchAlgorithmException;
@@ -29,7 +28,6 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException;
@@ -68,8 +66,6 @@ public class TestZKFailoverController ex
@Before
public void setupConfAndServices() {
- // skip tests on Windows until after resolution of ZooKeeper client bug
- assumeTrue(!Shell.WINDOWS);
conf = new Configuration();
conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
@@ -232,6 +228,27 @@ public class TestZKFailoverController ex
cluster.stop();
}
}
+
+ /**
+ * Test that, when the health monitor indicates bad health status,
+ * failover is triggered. Also ensures that graceful active->standby
+ * transition is used when possible, falling back to fencing when
+ * the graceful approach fails.
+ */
+ @Test(timeout=15000)
+ public void testAutoFailoverOnBadState() throws Exception {
+ try {
+ cluster.start();
+ DummyHAService svc0 = cluster.getService(0);
+ LOG.info("Faking svc0 to change the state, should failover to svc1");
+ svc0.state = HAServiceState.STANDBY;
+
+ // Should fail back to svc0 at this point
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
+ } finally {
+ cluster.stop();
+ }
+ }
@Test(timeout=15000)
public void testAutoFailoverOnLostZKSession() throws Exception {