HDFS-9311. Support optional offload of NameNode HA service health checks to a
separate RPC server. Contributed by Chris Nauroth.
(cherry picked from commit bf8e45298218f70e38838152f69c7705d8606bd6)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af0f2e27
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af0f2e27
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af0f2e27
Branch: refs/heads/branch-2
Commit: af0f2e27d189ceefa71fbb5178cc69006a62a257
Parents: 0377795
Author: cnauroth <[email protected]>
Authored: Tue Oct 27 23:07:14 2015 -0700
Committer: cnauroth <[email protected]>
Committed: Tue Oct 27 23:08:32 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/ha/HAServiceTarget.java | 50 ++++++++++-
.../org/apache/hadoop/ha/HealthMonitor.java | 2 +-
.../org/apache/hadoop/ha/DummyHAService.java | 47 +++++++++-
.../org/apache/hadoop/ha/TestHealthMonitor.java | 10 ++-
...HealthMonitorWithDedicatedHealthAddress.java | 37 ++++++++
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 ++
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 23 +++++
.../hadoop/hdfs/server/namenode/NameNode.java | 64 ++++++++++++--
.../hdfs/server/namenode/NameNodeRpcServer.java | 75 +++++++++++++++-
.../hadoop/hdfs/tools/NNHAServiceTarget.java | 11 +++
.../src/main/resources/hdfs-default.xml | 39 ++++++++
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 30 +++----
.../TestNameNodeRespectsBindHostKeys.java | 50 ++++++++++-
.../server/namenode/ha/TestNNHealthCheck.java | 93 ++++++++++++++------
15 files changed, 473 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
index 56678b4..98aab99 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
@@ -50,6 +50,23 @@ public abstract class HAServiceTarget {
public abstract InetSocketAddress getAddress();
/**
+ * Returns an optional separate RPC server address for health checks at the
+ * target node. If defined, then this address is used by the health monitor
+ * for the {@link HAServiceProtocol#monitorHealth()} and
+ * {@link HAServiceProtocol#getServiceStatus()} calls. This can be useful
for
+ * separating out these calls onto separate RPC handlers to protect against
+ * resource exhaustion in the main RPC handler pool. If null (which is the
+ * default implementation), then all RPC calls go to the address defined by
+ * {@link #getAddress()}.
+ *
+ * @return IPC address of the lifeline RPC server on the target node, or null
+ * if no lifeline RPC server is used
+ */
+ public InetSocketAddress getHealthMonitorAddress() {
+ return null;
+ }
+
+ /**
* @return the IPC address of the ZKFC on the target node
*/
public abstract InetSocketAddress getZKFCAddress();
@@ -73,15 +90,42 @@ public abstract class HAServiceTarget {
*/
public HAServiceProtocol getProxy(Configuration conf, int timeoutMs)
throws IOException {
+ return getProxyForAddress(conf, timeoutMs, getAddress());
+ }
+
+ /**
+ * Returns a proxy to connect to the target HA service for health monitoring.
+ * If {@link #getHealthMonitorAddress()} is implemented to return a non-null
+ * address, then this proxy will connect to that address. Otherwise, the
+ * returned proxy defaults to using {@link #getAddress()}, which means this
+ * method's behavior is identical to {@link #getProxy(Configuration, int)}.
+ *
+ * @param conf Configuration
+ * @param timeoutMs timeout in milliseconds
+ * @return a proxy to connect to the target HA service for health monitoring
+ * @throws IOException if there is an error
+ */
+ public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
+ int timeoutMs) throws IOException {
+ InetSocketAddress addr = getHealthMonitorAddress();
+ if (addr == null) {
+ addr = getAddress();
+ }
+ return getProxyForAddress(conf, timeoutMs, addr);
+ }
+
+ private HAServiceProtocol getProxyForAddress(Configuration conf,
+ int timeoutMs, InetSocketAddress addr) throws IOException {
Configuration confCopy = new Configuration(conf);
// Lower the timeout so we quickly fail to connect
-
confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
1);
+ confCopy.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
return new HAServiceProtocolClientSideTranslatorPB(
- getAddress(),
+ addr,
confCopy, factory, timeoutMs);
}
-
+
/**
* @return a proxy to the ZKFC which is associated with this HA service.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
index 8c87629..24c149c 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
@@ -191,7 +191,7 @@ public class HealthMonitor {
* Connect to the service to be monitored. Stubbed out for easier testing.
*/
protected HAServiceProtocol createProxy() throws IOException {
- return targetToMonitor.getProxy(conf, rpcTimeout);
+ return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout);
}
private void doHealthChecks() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
index aef6c4d..551da56 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
@@ -49,10 +49,10 @@ class DummyHAService extends HAServiceTarget {
public static final Log LOG = LogFactory.getLog(DummyHAService.class);
private static final String DUMMY_FENCE_KEY = "dummy.fence.key";
volatile HAServiceState state;
- HAServiceProtocol proxy;
+ HAServiceProtocol proxy, healthMonitorProxy;
ZKFCProtocol zkfcProxy = null;
NodeFencer fencer;
- InetSocketAddress address;
+ InetSocketAddress address, healthMonitorAddress;
boolean isHealthy = true;
boolean actUnreachable = false;
boolean failToBecomeActive, failToBecomeStandby, failToFence;
@@ -80,6 +80,7 @@ class DummyHAService extends HAServiceTarget {
}
Configuration conf = new Configuration();
this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
+ this.healthMonitorProxy = makeHealthMonitorMock(conf,
HA_HM_RPC_TIMEOUT_DEFAULT);
try {
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
this.fencer = Mockito.spy(
@@ -92,7 +93,18 @@ class DummyHAService extends HAServiceTarget {
this.index = instances.size();
}
}
-
+
+ DummyHAService(HAServiceState state, InetSocketAddress address,
+ InetSocketAddress healthMonitorAddress, boolean testWithProtoBufRPC) {
+ this(state, address, testWithProtoBufRPC);
+ if (testWithProtoBufRPC) {
+ this.healthMonitorAddress = startAndGetRPCServerAddress(
+ healthMonitorAddress);
+ } else {
+ this.healthMonitorAddress = healthMonitorAddress;
+ }
+ }
+
public void setSharedResource(DummySharedResource rsrc) {
this.sharedResource = rsrc;
}
@@ -134,12 +146,32 @@ class DummyHAService extends HAServiceTarget {
return Mockito.spy(service);
}
+ private HAServiceProtocol makeHealthMonitorMock(Configuration conf,
+ int timeoutMs) {
+ HAServiceProtocol service;
+ if (!testWithProtoBufRPC) {
+ service = new MockHAProtocolImpl();
+ } else {
+ try {
+ service = super.getHealthMonitorProxy(conf, timeoutMs);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+ return Mockito.spy(service);
+ }
+
@Override
public InetSocketAddress getAddress() {
return address;
}
@Override
+ public InetSocketAddress getHealthMonitorAddress() {
+ return healthMonitorAddress;
+ }
+
+ @Override
public InetSocketAddress getZKFCAddress() {
return null;
}
@@ -152,6 +184,15 @@ class DummyHAService extends HAServiceTarget {
}
return proxy;
}
+
+ @Override
+ public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
+ int timeout) throws IOException {
+ if (testWithProtoBufRPC) {
+ proxy = makeHealthMonitorMock(conf, timeout);
+ }
+ return proxy;
+ }
@Override
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
index b58793f..6c46543 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
@@ -55,8 +55,7 @@ public class TestHealthMonitor {
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
- svc = new DummyHAService(HAServiceState.ACTIVE,
- new InetSocketAddress("0.0.0.0", 0), true);
+ svc = createDummyHAService();
hm = new HealthMonitor(conf, svc) {
@Override
protected HAServiceProtocol createProxy() throws IOException {
@@ -73,7 +72,12 @@ public class TestHealthMonitor {
LOG.info("Waiting for HEALTHY signal");
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
}
-
+
+ protected DummyHAService createDummyHAService() {
+ return new DummyHAService(HAServiceState.ACTIVE,
+ new InetSocketAddress("0.0.0.0", 0), true);
+ }
+
@Test(timeout=15000)
public void testMonitor() throws Exception {
LOG.info("Mocking bad health check, waiting for UNHEALTHY");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java
new file mode 100644
index 0000000..3212c10
--- /dev/null
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+
+/**
+ * Repeats all tests of {@link TestHealthMonitor}, but using a separate
+ * dedicated health check RPC address.
+ */
+public class TestHealthMonitorWithDedicatedHealthAddress
+ extends TestHealthMonitor {
+
+ @Override
+ protected DummyHAService createDummyHAService() {
+ return new DummyHAService(HAServiceState.ACTIVE,
+ new InetSocketAddress("0.0.0.0", 0),
+ new InetSocketAddress("0.0.0.0", 0), true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3b72524..932706b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -757,6 +757,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9307. fuseConnect should be private to fuse_connect.c (Mingliang Liu
via Colin P. McCabe)
+ HDFS-9311. Support optional offload of NameNode HA service health checks to
+ a separate RPC server. (cnauroth)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c80edb6..9834108 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -138,6 +138,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys
{
public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY =
"dfs.namenode.rpc-bind-host";
public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY =
"dfs.namenode.servicerpc-address";
public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY =
"dfs.namenode.servicerpc-bind-host";
+ public static final String DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY =
+ "dfs.namenode.lifeline.rpc-address";
+ public static final String DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY =
+ "dfs.namenode.lifeline.rpc-bind-host";
public static final String DFS_NAMENODE_MAX_OBJECTS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0;
@@ -466,6 +470,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int
DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY =
"dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
+ public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1;
+ public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY =
"dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY =
"dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_SUPPORT_APPEND_KEY = "dfs.support.append";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index b1a2d9e..5178daa 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -27,6 +27,7 @@ import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DE
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
@@ -574,6 +575,28 @@ public class DFSUtil {
}
/**
+ * Map a logical namenode ID to its lifeline address. Use the given
+ * nameservice if specified, or the configured one if none is given.
+ *
+ * @param conf Configuration
+ * @param nsId which nameservice nnId is a part of, optional
+ * @param nnId the namenode ID to get the service addr for
+ * @return the lifeline addr, null if it could not be determined
+ */
+ public static String getNamenodeLifelineAddr(final Configuration conf,
+ String nsId, String nnId) {
+
+ if (nsId == null) {
+ nsId = getOnlyNameServiceIdOrNull(conf);
+ }
+
+ String lifelineAddrKey = DFSUtilClient.concatSuffixes(
+ DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, nsId, nnId);
+
+ return conf.get(lifelineAddrKey);
+ }
+
+ /**
* Flatten the given map, as returned by other functions in this class,
* into a flat list of {@link ConfiguredNNAddress} instances.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 730c15a..6102bdc 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -121,6 +121,8 @@ import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_BIND_HOST_K
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -227,6 +229,8 @@ public class NameNode implements NameNodeStatusMXBean {
DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+ DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
+ DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HTTP_ADDRESS_KEY,
@@ -488,6 +492,21 @@ public class NameNode implements NameNodeStatusMXBean {
}
/**
+ * Given a configuration get the address of the lifeline RPC server.
+ * If the lifeline RPC is not configured returns null.
+ *
+ * @param conf configuration
+ * @return address or null
+ */
+ InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
+ String addr = getTrimmedOrNull(conf,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
+ if (addr == null) {
+ return null;
+ }
+ return NetUtils.createSocketAddr(addr);
+ }
+
+ /**
* Given a configuration get the address of the service rpc server
* If the service rpc is not configured returns null
*/
@@ -498,23 +517,41 @@ public class NameNode implements NameNodeStatusMXBean {
protected InetSocketAddress getRpcServerAddress(Configuration conf) {
return DFSUtilClient.getNNAddress(conf);
}
-
+
+ /**
+ * Given a configuration get the bind host of the lifeline RPC server.
+ * If the bind host is not configured returns null.
+ *
+ * @param conf configuration
+ * @return bind host or null
+ */
+ String getLifelineRpcServerBindHost(Configuration conf) {
+ return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
+ }
+
/** Given a configuration get the bind host of the service rpc server
* If the bind host is not configured returns null.
*/
protected String getServiceRpcServerBindHost(Configuration conf) {
- String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
- if (addr == null || addr.isEmpty()) {
- return null;
- }
- return addr;
+ return getTrimmedOrNull(conf, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
}
/** Given a configuration get the bind host of the client rpc server
* If the bind host is not configured returns null.
*/
protected String getRpcServerBindHost(Configuration conf) {
- String addr = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY);
+ return getTrimmedOrNull(conf, DFS_NAMENODE_RPC_BIND_HOST_KEY);
+ }
+
+ /**
+ * Gets a trimmed value from configuration, or null if no value is defined.
+ *
+ * @param conf configuration
+ * @param key configuration key to get
+ * @return trimmed value, or null if no value is defined
+ */
+ private static String getTrimmedOrNull(Configuration conf, String key) {
+ String addr = conf.getTrimmed(key);
if (addr == null || addr.isEmpty()) {
return null;
}
@@ -522,6 +559,19 @@ public class NameNode implements NameNodeStatusMXBean {
}
/**
+ * Modifies the configuration to contain the lifeline RPC address setting.
+ *
+ * @param conf configuration to modify
+ * @param lifelineRPCAddress lifeline RPC address
+ */
+ void setRpcLifelineServerAddress(Configuration conf,
+ InetSocketAddress lifelineRPCAddress) {
+ LOG.info("Setting lifeline RPC address {}", lifelineRPCAddress);
+ conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
+ NetUtils.getHostPortString(lifelineRPCAddress));
+ }
+
+ /**
* Modifies the configuration passed to contain the service rpc address
setting
*/
protected void setRpcServiceServerAddress(Configuration conf,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a19fdd0..7834901 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
@@ -210,6 +212,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
/** The RPC server that listens to requests from DataNodes */
private final RPC.Server serviceRpcServer;
private final InetSocketAddress serviceRPCAddress;
+
+ /** The RPC server that listens to lifeline requests */
+ private final RPC.Server lifelineRpcServer;
+ private final InetSocketAddress lifelineRPCAddress;
/** The RPC server that listens to requests from clients */
protected final RPC.Server clientRpcServer;
@@ -336,6 +342,42 @@ class NameNodeRpcServer implements NamenodeProtocols {
serviceRpcServer = null;
serviceRPCAddress = null;
}
+
+ InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
+ if (lifelineRpcAddr != null) {
+ RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
+ ProtobufRpcEngine.class);
+ String bindHost = nn.getLifelineRpcServerBindHost(conf);
+ if (bindHost == null) {
+ bindHost = lifelineRpcAddr.getHostName();
+ }
+ LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
+ lifelineRpcAddr.getPort());
+
+ int lifelineHandlerCount = conf.getInt(
+ DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY,
+ DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT);
+
+ lifelineRpcServer = new RPC.Builder(conf)
+ .setProtocol(HAServiceProtocolPB.class)
+ .setInstance(haPbService)
+ .setBindAddress(bindHost)
+ .setPort(lifelineRpcAddr.getPort())
+ .setNumHandlers(lifelineHandlerCount)
+ .setVerbose(false)
+ .setSecretManager(namesystem.getDelegationTokenSecretManager())
+ .build();
+
+ // Update the address with the correct port
+ InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
+ lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
+ listenAddr.getPort());
+ nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
+ } else {
+ lifelineRpcServer = null;
+ lifelineRPCAddress = null;
+ }
+
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) {
@@ -379,12 +421,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (serviceRpcServer != null) {
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
+ if (lifelineRpcServer != null) {
+ lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+ }
}
// The rpc-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
- clientRpcAddress = new InetSocketAddress(
- rpcAddr.getHostName(), listenAddr.getPort());
+ clientRpcAddress = new InetSocketAddress(
+ rpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServerAddress(conf, clientRpcAddress);
minimumDataNodeVersion = conf.get(
@@ -416,7 +461,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (serviceRpcServer != null) {
serviceRpcServer.setTracer(nn.tracer);
}
- }
+ if (lifelineRpcServer != null) {
+ lifelineRpcServer.setTracer(nn.tracer);
+ }
+ }
+
+ /** Allow access to the lifeline RPC server for testing */
+ @VisibleForTesting
+ RPC.Server getLifelineRpcServer() {
+ return lifelineRpcServer;
+ }
/** Allow access to the client RPC server for testing */
@VisibleForTesting
@@ -438,6 +492,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
+ if (lifelineRpcServer != null) {
+ lifelineRpcServer.start();
+ }
}
/**
@@ -448,6 +505,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (serviceRpcServer != null) {
serviceRpcServer.join();
}
+ if (lifelineRpcServer != null) {
+ lifelineRpcServer.join();
+ }
}
/**
@@ -460,8 +520,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (serviceRpcServer != null) {
serviceRpcServer.stop();
}
+ if (lifelineRpcServer != null) {
+ lifelineRpcServer.stop();
+ }
}
-
+
+ InetSocketAddress getLifelineRpcAddress() {
+ return lifelineRPCAddress;
+ }
+
InetSocketAddress getServiceRpcAddress() {
return serviceRPCAddress;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
index 247ac02..d579a4d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
@@ -49,6 +49,7 @@ public class NNHAServiceTarget extends HAServiceTarget {
private static final String NAMENODE_ID_KEY = "namenodeid";
private final InetSocketAddress addr;
+ private final InetSocketAddress lifelineAddr;
private InetSocketAddress zkfcAddr;
private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError;
@@ -90,6 +91,11 @@ public class NNHAServiceTarget extends HAServiceTarget {
this.addr = NetUtils.createSocketAddr(serviceAddr,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+ String lifelineAddrStr =
+ DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId);
+ this.lifelineAddr = (lifelineAddrStr != null) ?
+ NetUtils.createSocketAddr(lifelineAddrStr) : null;
+
this.autoFailoverEnabled = targetConf.getBoolean(
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
@@ -120,6 +126,11 @@ public class NNHAServiceTarget extends HAServiceTarget {
}
@Override
+ public InetSocketAddress getHealthMonitorAddress() {
+ return lifelineAddr;
+ }
+
+ @Override
public InetSocketAddress getZKFCAddress() {
Preconditions.checkState(autoFailoverEnabled,
"ZKFC address not relevant when auto failover is off");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1389bc9..5759df5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -79,6 +79,33 @@
</property>
<property>
+ <name>dfs.namenode.lifeline.rpc-address</name>
+ <value></value>
+ <description>
+ NameNode RPC lifeline address. This is an optional separate RPC address
+ that can be used to isolate health checks and liveness to protect against
+ resource exhaustion in the main RPC handler pool. In the case of
+ HA/Federation where multiple NameNodes exist, the name service ID is added
+ to the name e.g. dfs.namenode.lifeline.rpc-address.ns1. The value of this
+ property will take the form of nn-host1:rpc-port. If this property is not
+ defined, then the NameNode will not start a lifeline RPC server. By
+ default, the property is not defined.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.lifeline.rpc-bind-host</name>
+ <value></value>
+ <description>
+ The actual address the lifeline RPC server will bind to. If this optional
+ address is set, it overrides only the hostname portion of
+ dfs.namenode.lifeline.rpc-address. It can also be specified per name node
+ or name service for HA/Federation. This is useful for making the name node
+ listen on all interfaces by setting it to 0.0.0.0.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.secondary.http-address</name>
<value>0.0.0.0:50090</value>
<description>
@@ -682,6 +709,18 @@
</property>
<property>
+ <name>dfs.namenode.lifeline.handler.count</name>
+ <value>1</value>
+ <description>
+ Sets number of RPC server threads the NameNode runs for handling the
+ lifeline RPC server. The default value is 1, because this RPC server
+ handles only HA health check requests from ZKFC. These are lightweight
+ requests that run single-threaded from the ZKFC client side. This property
+ has no effect if dfs.namenode.lifeline.rpc-address is not defined.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.999f</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 3d87bbf..e73b59b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -37,6 +37,7 @@ import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_K
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
@@ -869,23 +870,20 @@ public class MiniDFSCluster {
nameserviceId, nnId);
destConf.set(key, srcConf.get(key));
- key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
- nameserviceId, nnId);
- String val = srcConf.get(key);
- if (val != null) {
- destConf.set(key, srcConf.get(key));
- }
-
- key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
- nameserviceId, nnId);
- val = srcConf.get(key);
- if (val != null) {
- destConf.set(key, srcConf.get(key));
- }
+ copyKey(srcConf, destConf, nameserviceId, nnId,
+ DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ copyKey(srcConf, destConf, nameserviceId, nnId,
+ DFS_NAMENODE_HTTPS_ADDRESS_KEY);
+ copyKey(srcConf, destConf, nameserviceId, nnId,
+ DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
+ copyKey(srcConf, destConf, nameserviceId, nnId,
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
+ }
- key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
- nameserviceId, nnId);
- val = srcConf.get(key);
+ private static void copyKey(Configuration srcConf, Configuration destConf,
+ String nameserviceId, String nnId, String baseKey) {
+ String key = DFSUtil.addKeySuffixes(baseKey, nameserviceId, nnId);
+ String val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java
index ed00e37..b0fa93d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java
@@ -47,9 +47,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
*
* - DFS_NAMENODE_RPC_BIND_HOST_KEY
* - DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY
+ * - DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY
* - DFS_NAMENODE_HTTP_BIND_HOST_KEY
* - DFS_NAMENODE_HTTPS_BIND_HOST_KEY
-
*/
public class TestNameNodeRespectsBindHostKeys {
public static final Log LOG =
LogFactory.getLog(TestNameNodeRespectsBindHostKeys.class);
@@ -66,6 +66,12 @@ public class TestNameNodeRespectsBindHostKeys {
return
rpcServer.getServiceRpcServer().getListenerAddress().getAddress().toString();
}
+ private static String getLifelineRpcServerAddress(MiniDFSCluster cluster) {
+ NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
+ return rpcServer.getLifelineRpcServer().getListenerAddress().getAddress()
+ .toString();
+ }
+
@Test (timeout=300000)
public void testRpcBindHostKey() throws IOException {
Configuration conf = new HdfsConfiguration();
@@ -148,6 +154,48 @@ public class TestNameNodeRespectsBindHostKeys {
}
}
+ @Test (timeout=300000)
+ public void testLifelineRpcBindHostKey() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+
+ LOG.info("Testing without " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
+
+ conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
+
+ // NN should not bind the wildcard address by default.
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster.waitActive();
+ String address = getLifelineRpcServerAddress(cluster);
+ assertThat("Bind address not expected to be wildcard by default.",
+ address, not("/" + WILDCARD_ADDRESS));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ LOG.info("Testing with " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
+
+ // Tell NN to bind the wildcard address.
+ conf.set(DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
+
+ // Verify that NN binds wildcard address now.
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster.waitActive();
+ String address = getLifelineRpcServerAddress(cluster);
+ assertThat("Bind address " + address + " is not wildcard.",
+ address, is("/" + WILDCARD_ADDRESS));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
@Test(timeout=300000)
public void testHttpBindHostKey() throws IOException {
Configuration conf = new HdfsConfiguration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af0f2e27/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
index ab2a8dd..6519588 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
@@ -17,57 +17,92 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestNNHealthCheck {
+ private MiniDFSCluster cluster;
+ private Configuration conf;
+
+ @Before
+ public void setup() {
+ conf = new Configuration();
+ }
+
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testNNHealthCheck() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster.Builder(conf)
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .build();
+ doNNHealthCheckTest();
+ }
+
+ @Test
+ public void testNNHealthCheckWithLifelineAddress() throws IOException {
+ conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
+ cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.build();
+ doNNHealthCheckTest();
+ }
+
+ private void doNNHealthCheckTest() throws IOException {
+ NameNodeResourceChecker mockResourceChecker = Mockito.mock(
+ NameNodeResourceChecker.class);
+ Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace();
+ cluster.getNameNode(0).getNamesystem()
+ .setNNResourceChecker(mockResourceChecker);
- NameNodeResourceChecker mockResourceChecker = Mockito.mock(
- NameNodeResourceChecker.class);
- Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace();
- cluster.getNameNode(0).getNamesystem()
- .setNNResourceChecker(mockResourceChecker);
-
- NamenodeProtocols rpc = cluster.getNameNodeRpc(0);
-
- // Should not throw error, which indicates healthy.
+ NNHAServiceTarget haTarget = new NNHAServiceTarget(conf,
+ DFSUtil.getNamenodeNameServiceId(conf), "nn1");
+ HAServiceProtocol rpc = haTarget.getHealthMonitorProxy(conf, conf.getInt(
+ HA_HM_RPC_TIMEOUT_KEY, HA_HM_RPC_TIMEOUT_DEFAULT));
+
+ // Should not throw error, which indicates healthy.
+ rpc.monitorHealth();
+
+ Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
+
+ try {
+ // Should throw error - NN is unhealthy.
rpc.monitorHealth();
-
-
Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
-
- try {
- // Should throw error - NN is unhealthy.
- rpc.monitorHealth();
- fail("Should not have succeeded in calling monitorHealth");
- } catch (HealthCheckFailedException hcfe) {
- GenericTestUtils.assertExceptionContains(
- "The NameNode has no resources available", hcfe);
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
+ fail("Should not have succeeded in calling monitorHealth");
+ } catch (HealthCheckFailedException hcfe) {
+ GenericTestUtils.assertExceptionContains(
+ "The NameNode has no resources available", hcfe);
+ } catch (RemoteException re) {
+ GenericTestUtils.assertExceptionContains(
+ "The NameNode has no resources available",
+ re.unwrapRemoteException(HealthCheckFailedException.class));
}
}
}