[
https://issues.apache.org/jira/browse/HDFS-16907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17684089#comment-17684089
]
ASF GitHub Bot commented on HDFS-16907:
---------------------------------------
ayushtkn commented on code in PR #5349:
URL: https://github.com/apache/hadoop/pull/5349#discussion_r1096479948
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html:
##########
@@ -84,7 +84,8 @@
<th>Namenode HA State</th>
<th>Block Pool ID</th>
<th>Actor State</th>
- <th>Last Heartbeat</th>
+ <th>Last Heartbeat Sent</th>
+ <th>Last Heartbeat Received</th>
Review Comment:
Last Heartbeat Received gives me a sense like Namenode sent heartbeat to
Datanode, That ain't good from Correctness point of view.
Can you change to indicate it is the heartbeat response rather than the
heartbeat itself
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java:
##########
@@ -294,4 +297,107 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws
Exception {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testDataNodeMXBeanLastHeartbeats() throws Exception {
+ Configuration conf = new Configuration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(2))
+ .numDataNodes(1)
+ .build()) {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ cluster.transitionToStandby(1);
+
+ DataNode datanode = cluster.getDataNodes().get(0);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanName = new ObjectName(
+ "Hadoop:service=DataNode,name=DataNodeInfo");
+
+ // Verify and wait until one of the BP service actor identifies active
namenode as active
+ // and another as standby.
+ GenericTestUtils.waitFor(() -> {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+ return (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState")))
+ || (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState")));
+ },
+ 500,
+ 8000,
+ "No namenode is reported active");
+
+ // basic metrics validation
+ String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
+ Assert.assertEquals(datanode.getClusterId(), clusterId);
+ String version = (String)mbs.getAttribute(mxbeanName, "Version");
+ Assert.assertEquals(datanode.getVersion(),version);
+ String bpActorInfo = (String) mbs.getAttribute(mxbeanName,
"BPServiceActorInfo");
+ Assert.assertEquals(datanode.getBPServiceActorInfo(), bpActorInfo);
+
+ // Verify that last heartbeat sent to both namenodes in last 5 sec.
+ assertLastHeartbeatSentTime(datanode, "LastHeartbeat");
+ // Verify that last heartbeat response from both namenodes have been
received within
+ // last 5 sec.
+ assertLastHeartbeatSentTime(datanode, "LastHeartbeatResponseTime");
+
+
+ NameNode sbNameNode = cluster.getNameNode(1);
+
+ // Stopping standby namenode
+ sbNameNode.stop();
+
+ // Verify that last heartbeat response time from one of the namenodes
would stay much higher
+ // after stopping one namenode.
+ GenericTestUtils.waitFor(() -> {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+
+ long lastHeartbeatResponseTime1 =
+
Long.parseLong(bpServiceActorInfo1.get("LastHeartbeatResponseTime"));
+ long lastHeartbeatResponseTime2 =
+
Long.parseLong(bpServiceActorInfo2.get("LastHeartbeatResponseTime"));
+
+ LOG.info("Last heartbeat response from namenode 1: {}",
lastHeartbeatResponseTime1);
+ LOG.info("Last heartbeat response from namenode 2: {}",
lastHeartbeatResponseTime2);
+
+ return (lastHeartbeatResponseTime1 < 5L && lastHeartbeatResponseTime2
> 5L) || (
+ lastHeartbeatResponseTime1 > 5L && lastHeartbeatResponseTime2 <
5L);
+
+ },
+ 200,
+ 15000,
+ "Last heartbeat response should be higher than 5s for at least one
namenode");
Review Comment:
nit:
reduce the line breaks
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java:
##########
@@ -294,4 +297,107 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws
Exception {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testDataNodeMXBeanLastHeartbeats() throws Exception {
+ Configuration conf = new Configuration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(2))
+ .numDataNodes(1)
+ .build()) {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ cluster.transitionToStandby(1);
+
+ DataNode datanode = cluster.getDataNodes().get(0);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanName = new ObjectName(
+ "Hadoop:service=DataNode,name=DataNodeInfo");
+
+ // Verify and wait until one of the BP service actor identifies active
namenode as active
+ // and another as standby.
+ GenericTestUtils.waitFor(() -> {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+ return (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState")))
+ || (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState")));
+ },
+ 500,
+ 8000,
+ "No namenode is reported active");
+
+ // basic metrics validation
+ String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
+ Assert.assertEquals(datanode.getClusterId(), clusterId);
+ String version = (String)mbs.getAttribute(mxbeanName, "Version");
+ Assert.assertEquals(datanode.getVersion(),version);
+ String bpActorInfo = (String) mbs.getAttribute(mxbeanName,
"BPServiceActorInfo");
+ Assert.assertEquals(datanode.getBPServiceActorInfo(), bpActorInfo);
Review Comment:
Why basic metrics validation, there should be existing tests testing that?
You didn't bother them?
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java:
##########
@@ -294,4 +297,107 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws
Exception {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testDataNodeMXBeanLastHeartbeats() throws Exception {
+ Configuration conf = new Configuration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(2))
+ .numDataNodes(1)
+ .build()) {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ cluster.transitionToStandby(1);
+
+ DataNode datanode = cluster.getDataNodes().get(0);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanName = new ObjectName(
+ "Hadoop:service=DataNode,name=DataNodeInfo");
+
+ // Verify and wait until one of the BP service actor identifies active
namenode as active
+ // and another as standby.
+ GenericTestUtils.waitFor(() -> {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+ return (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState")))
+ || (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState")));
+ },
+ 500,
+ 8000,
+ "No namenode is reported active");
+
+ // basic metrics validation
+ String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
+ Assert.assertEquals(datanode.getClusterId(), clusterId);
+ String version = (String)mbs.getAttribute(mxbeanName, "Version");
+ Assert.assertEquals(datanode.getVersion(),version);
+ String bpActorInfo = (String) mbs.getAttribute(mxbeanName,
"BPServiceActorInfo");
+ Assert.assertEquals(datanode.getBPServiceActorInfo(), bpActorInfo);
+
+ // Verify that last heartbeat sent to both namenodes in last 5 sec.
+ assertLastHeartbeatSentTime(datanode, "LastHeartbeat");
+ // Verify that last heartbeat response from both namenodes have been
received within
+ // last 5 sec.
+ assertLastHeartbeatSentTime(datanode, "LastHeartbeatResponseTime");
+
+
+ NameNode sbNameNode = cluster.getNameNode(1);
+
+ // Stopping standby namenode
+ sbNameNode.stop();
+
+ // Verify that last heartbeat response time from one of the namenodes
would stay much higher
+ // after stopping one namenode.
+ GenericTestUtils.waitFor(() -> {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+
+ long lastHeartbeatResponseTime1 =
+
Long.parseLong(bpServiceActorInfo1.get("LastHeartbeatResponseTime"));
+ long lastHeartbeatResponseTime2 =
+
Long.parseLong(bpServiceActorInfo2.get("LastHeartbeatResponseTime"));
+
+ LOG.info("Last heartbeat response from namenode 1: {}",
lastHeartbeatResponseTime1);
+ LOG.info("Last heartbeat response from namenode 2: {}",
lastHeartbeatResponseTime2);
+
+ return (lastHeartbeatResponseTime1 < 5L && lastHeartbeatResponseTime2
> 5L) || (
+ lastHeartbeatResponseTime1 > 5L && lastHeartbeatResponseTime2 <
5L);
+
+ },
+ 200,
+ 15000,
+ "Last heartbeat response should be higher than 5s for at least one
namenode");
+
+ // Verify that last heartbeat sent to both namenodes in last 5 sec even
though
+ // the last heartbeat received from one of the namenodes is greater than
5 sec ago.
+ assertLastHeartbeatSentTime(datanode, "LastHeartbeat");
+ }
+ }
+
+ private static void assertLastHeartbeatSentTime(DataNode datanode, String
lastHeartbeat) {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+
+ long lastHeartbeatSent1 =
+ Long.parseLong(bpServiceActorInfo1.get(lastHeartbeat));
+ long lastHeartbeatSent2 =
+ Long.parseLong(bpServiceActorInfo2.get(lastHeartbeat));
+
+ Assert.assertTrue(lastHeartbeat + " for first bp service actor is higher
than 5s",
+ lastHeartbeatSent1 < 5L);
+ Assert.assertTrue(lastHeartbeat + " for second bp service actor is higher
than 5s",
+ lastHeartbeatSent2 < 5L);
Review Comment:
What are the chances of this guy going flaky in our unpredictable CI
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java:
##########
@@ -294,4 +297,107 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws
Exception {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testDataNodeMXBeanLastHeartbeats() throws Exception {
+ Configuration conf = new Configuration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(2))
+ .numDataNodes(1)
+ .build()) {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ cluster.transitionToStandby(1);
+
+ DataNode datanode = cluster.getDataNodes().get(0);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanName = new ObjectName(
+ "Hadoop:service=DataNode,name=DataNodeInfo");
+
+ // Verify and wait until one of the BP service actor identifies active
namenode as active
+ // and another as standby.
+ GenericTestUtils.waitFor(() -> {
+ List<Map<String, String>> bpServiceActorInfo =
datanode.getBPServiceActorInfoMap();
+ Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
+ Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);
+ return (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState")))
+ || (HAServiceProtocol.HAServiceState.ACTIVE.toString()
+ .equals(bpServiceActorInfo2.get("NamenodeHaState"))
+ && HAServiceProtocol.HAServiceState.STANDBY.toString()
+ .equals(bpServiceActorInfo1.get("NamenodeHaState")));
+ },
+ 500,
+ 8000,
+ "No namenode is reported active");
Review Comment:
This looks like checking if we have the datanode has acknowledged the active
namenode or not? Can we have this util as part of MiniDfsCluster?
We have something like ``waitDatanodeFullyStarted`` there, may be a new
method with an extra param, checkActive or something like that, which can may
be in the Datanode check for Active NN also
```
public boolean isDatanodeFullyStarted() {
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
if (!bp.isInitialized() || !bp.isAlive() || bp.getActiveNN()==null) {
return false;
}
}
```
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java:
##########
@@ -294,4 +297,107 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws
Exception {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testDataNodeMXBeanLastHeartbeats() throws Exception {
+ Configuration conf = new Configuration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(2))
+ .numDataNodes(1)
Review Comment:
Datanodes are 1 by default
> BP service actor LastHeartbeat is not sufficient to track realtime connection
> breaks
> ------------------------------------------------------------------------------------
>
> Key: HDFS-16907
> URL: https://issues.apache.org/jira/browse/HDFS-16907
> Project: Hadoop HDFS
> Issue Type: Improvement
> Reporter: Viraj Jasani
> Assignee: Viraj Jasani
> Priority: Major
> Labels: pull-request-available
> Attachments: Screenshot 2023-02-03 at 6.12.24 PM.png
>
>
> Each BP service actor thread maintains _lastHeartbeatTime_ with the namenode
> that it is connected to. However, this is updated even if the connection to
> the namenode is broken.
> Suppose, the actor thread keeps heartbeating to namenode and suddenly the
> socket connection is broken. When this happens, until specific time duration,
> the actor thread consistently keeps updating _lastHeartbeatTime_ before even
> initiating heartbeat connection with namenode. If connection cannot be
> established even after RPC retries are exhausted, then IOException is thrown.
> This means that heartbeat response has not been received from the namenode.
> In the loop, the actor thread keeps trying connecting for heartbeat and the
> last heartbeat stays close to 1/2s even though in reality there is no
> response being received from namenode.
>
> Sample Exception from the BP service actor thread, during which LastHeartbeat
> stays very low:
> {code:java}
> 2023-02-03 22:34:55,725 WARN [xyz:9000] datanode.DataNode - IOException in
> offerService
> java.io.EOFException: End of File Exception between local host is: "dn-0";
> destination host is: "nn-1":9000; : java.io.EOFException; For more details
> see: http://wiki.apache.org/hadoop/EOFException
> at sun.reflect.GeneratedConstructorAccessor34.newInstance(Unknown Source)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:913)
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:862)
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1553)
> at org.apache.hadoop.ipc.Client.call(Client.java:1495)
> at org.apache.hadoop.ipc.Client.call(Client.java:1392)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
> at com.sun.proxy.$Proxy17.sendHeartbeat(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.sendHeartbeat(DatanodeProtocolClientSideTranslatorPB.java:168)
> at
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.sendHeartBeat(BPServiceActor.java:544)
> at
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:682)
> at
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:890)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readInt(DataInputStream.java:392)
> at org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1884)
> at
> org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1176)
> at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1074) {code}
> Attaching screenshots of how last heartbeat value looks when the above error
> is consistently getting logged.
>
> Last heartbeat response time is important to initiate any auto-recovery from
> datanode. Hence, we should introduce LastHeartbeatResponseTime that only gets
> updated if the BP service actor thread was successfully able to retrieve
> response from namenode.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]