This is an automated email from the ASF dual-hosted git repository.
ferhui pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4c2647 HDFS-16088. Standby NameNode process
getLiveDatanodeStorageReport req… (#3140)
b4c2647 is described below
commit b4c2647d0d7534a830bb90508cc30550e39c95dd
Author: litao <[email protected]>
AuthorDate: Thu Jul 8 14:10:45 2021 +0800
HDFS-16088. Standby NameNode process getLiveDatanodeStorageReport req…
(#3140)
---
.../hdfs/server/balancer/NameNodeConnector.java | 99 ++++++++++++-----
.../balancer/TestBalancerWithHANameNodes.java | 123 +++++++++++++++++++++
2 files changed, 192 insertions(+), 30 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 7634eaf..0e121ea 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -259,37 +259,19 @@ public class NameNodeConnector implements Closeable {
getBlocksRateLimiter.acquire();
}
boolean isRequestStandby = false;
- NamenodeProtocol nnproxy = null;
+ NamenodeProtocol nnProxy = null;
try {
- if (requestToStandby && nsId != null
- && HAUtil.isHAEnabled(config, nsId)) {
- List<ClientProtocol> namenodes =
- HAUtil.getProxiesForAllNameNodesInNameservice(config, nsId);
- for (ClientProtocol proxy : namenodes) {
- try {
- if (proxy.getHAServiceState().equals(
- HAServiceProtocol.HAServiceState.STANDBY)) {
- NamenodeProtocol sbn = NameNodeProxies.createNonHAProxy(
- config, RPC.getServerAddress(proxy), NamenodeProtocol.class,
- UserGroupInformation.getCurrentUser(), false).getProxy();
- nnproxy = sbn;
- isRequestStandby = true;
- break;
- }
- } catch (Exception e) {
- // Ignore the exception while connecting to a namenode.
- LOG.debug("Error while connecting to namenode", e);
- }
- }
- if (nnproxy == null) {
- LOG.warn("Request #getBlocks to Standby NameNode but meet exception,"
- + " will fallback to normal way.");
- nnproxy = namenode;
- }
+ ProxyPair proxyPair = getProxy();
+ isRequestStandby = proxyPair.isRequestStandby;
+ ClientProtocol proxy = proxyPair.clientProtocol;
+ if (isRequestStandby) {
+ nnProxy = NameNodeProxies.createNonHAProxy(
+ config, RPC.getServerAddress(proxy), NamenodeProtocol.class,
+ UserGroupInformation.getCurrentUser(), false).getProxy();
} else {
- nnproxy = namenode;
+ nnProxy = namenode;
}
- return nnproxy.getBlocks(datanode, size, minBlockSize, timeInterval);
+ return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success.");
@@ -314,7 +296,54 @@ public class NameNodeConnector implements Closeable {
/** @return live datanode storage reports. */
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
- return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE);
+ boolean isRequestStandby = false;
+ try {
+ ProxyPair proxyPair = getProxy();
+ isRequestStandby = proxyPair.isRequestStandby;
+ ClientProtocol proxy = proxyPair.clientProtocol;
+ return proxy.getDatanodeStorageReport(DatanodeReportType.LIVE);
+ } finally {
+ if (isRequestStandby) {
+ LOG.info("Request #getLiveDatanodeStorageReport to Standby " +
+ "NameNode success.");
+ }
+ }
+ }
+
+ /**
+ * get the proxy.
+ * @return ProxyPair(clientProtocol and isRequestStandby)
+ * @throws IOException
+ */
+ private ProxyPair getProxy() throws IOException {
+ boolean isRequestStandby = false;
+ ClientProtocol clientProtocol = null;
+ if (requestToStandby && nsId != null
+ && HAUtil.isHAEnabled(config, nsId)) {
+ List<ClientProtocol> namenodes =
+ HAUtil.getProxiesForAllNameNodesInNameservice(config, nsId);
+ for (ClientProtocol proxy : namenodes) {
+ try {
+ if (proxy.getHAServiceState().equals(
+ HAServiceProtocol.HAServiceState.STANDBY)) {
+ clientProtocol = proxy;
+ isRequestStandby = true;
+ break;
+ }
+ } catch (Exception e) {
+ // Ignore the exception while connecting to a namenode.
+ LOG.debug("Error while connecting to namenode", e);
+ }
+ }
+ if (clientProtocol == null) {
+ LOG.warn("Request to Standby" +
+ " NameNode but meet exception, will fallback to normal way.");
+ clientProtocol = namenode;
+ }
+ } else {
+ clientProtocol = namenode;
+ }
+ return new ProxyPair(clientProtocol, isRequestStandby);
}
/** @return the key manager */
@@ -432,4 +461,14 @@ public class NameNodeConnector implements Closeable {
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
+ ", bpid=" + blockpoolID + "]";
}
-}
+
+ private static class ProxyPair {
+ private final ClientProtocol clientProtocol;
+ private final boolean isRequestStandby;
+
+ ProxyPair(ClientProtocol clientProtocol, boolean isRequestStandby) {
+ this.clientProtocol = clientProtocol;
+ this.isRequestStandby = isRequestStandby;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index a74f94f..a9c8136 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -34,6 +34,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@@ -171,6 +174,8 @@ public class TestBalancerWithHANameNodes {
// Check getBlocks request to Standby NameNode.
assertTrue(log.getOutput().contains(
"Request #getBlocks to Standby NameNode success."));
+ assertTrue(log.getOutput().contains(
+ "Request #getLiveDatanodeStorageReport to Standby NameNode
success"));
} finally {
cluster.shutdown();
}
@@ -236,4 +241,122 @@ public class TestBalancerWithHANameNodes {
}
}
}
+
+ /**
+ * Comparing the results of getLiveDatanodeStorageReport()
+ * from the active and standby NameNodes,
+ * the results should be the same.
+ */
+ @Test(timeout = 60000)
+ public void testGetLiveDatanodeStorageReport() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ TestBalancer.initConf(conf);
+ assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
+ NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
+ nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+ Configuration copiedConf = new Configuration(conf);
+ // Try capture NameNodeConnector log.
+ LogCapturer log =LogCapturer.captureLogs(
+ LoggerFactory.getLogger(NameNodeConnector.class));
+ // We needs to assert datanode info from ANN and SNN, so the
+ // heartbeat should disabled for the duration of method execution.
+ copiedConf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 60000);
+ cluster = new MiniDFSCluster.Builder(copiedConf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(TEST_CAPACITIES.length)
+ .racks(TEST_RACKS)
+ .simulatedCapacities(TEST_CAPACITIES)
+ .build();
+ HATestUtil.setFailoverConfigurations(cluster, conf);
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ URI namenode = (URI) DFSUtil.getInternalNsRpcUris(conf)
+ .toArray()[0];
+ String nsId = DFSUtilClient.getNameServiceIds(conf)
+ .toArray()[0].toString();
+
+ // Request to active namenode.
+ NameNodeConnector nncActive = new NameNodeConnector(
+ "nncActive", namenode,
+ nsId, new Path("/test"),
+ null, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+ DatanodeStorageReport[] datanodeStorageReportFromAnn =
+ nncActive.getLiveDatanodeStorageReport();
+ assertTrue(!log.getOutput().contains(
+ "Request #getLiveDatanodeStorageReport to Standby NameNode
success"));
+ nncActive.close();
+
+ // Request to standby namenode.
+ conf.setBoolean(DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
+ true);
+ NameNodeConnector nncStandby = new NameNodeConnector(
+ "nncStandby", namenode,
+ nsId, new Path("/test"),
+ null, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+ DatanodeStorageReport[] datanodeStorageReportFromSnn =
+ nncStandby.getLiveDatanodeStorageReport();
+ assertTrue(log.getOutput().contains(
+ "Request #getLiveDatanodeStorageReport to Standby NameNode
success"));
+ nncStandby.close();
+
+ // Assert datanode info.
+ assertEquals(
+ datanodeStorageReportFromAnn[0].getDatanodeInfo()
+ .getDatanodeReport(),
+ datanodeStorageReportFromSnn[0].getDatanodeInfo()
+ .getDatanodeReport());
+ assertEquals(
+ datanodeStorageReportFromAnn[1].getDatanodeInfo()
+ .getDatanodeReport(),
+ datanodeStorageReportFromSnn[1].getDatanodeInfo()
+ .getDatanodeReport());
+
+ // Assert all fields datanode storage info.
+ for (int i = 0; i < TEST_CAPACITIES.length; i++) {
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getStorage().toString(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getStorage().toString());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getCapacity(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getCapacity());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getBlockPoolUsed(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getBlockPoolUsed());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getDfsUsed(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getDfsUsed());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getRemaining(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getRemaining());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getMount(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getMount());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .getNonDfsUsed(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .getNonDfsUsed());
+ assertEquals(
+ datanodeStorageReportFromAnn[i].getStorageReports()[0]
+ .isFailed(),
+ datanodeStorageReportFromSnn[i].getStorageReports()[0]
+ .isFailed());
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]