goiri commented on code in PR #5878:
URL: https://github.com/apache/hadoop/pull/5878#discussion_r1286275737


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -285,13 +323,67 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implementation is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+    Callable<HAServiceState> getHAServiceStateTask = () -> 
getHAServiceState(proxyInfo);
+
+    try {
+      Future<HAServiceState> task =
+          nnProbingThreadPool.submit(getHAServiceStateTask);

Review Comment:
   I would put the lamda directly here and do the following split:
   ```
   Future<HAServiceState> task = nnProbingThreadPool.submit(
       () -> getHAServiceState(proxyInfo));
   ```



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -285,13 +323,67 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implementation is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+    Callable<HAServiceState> getHAServiceStateTask = () -> 
getHAServiceState(proxyInfo);
+
+    try {
+      Future<HAServiceState> task =
+          nnProbingThreadPool.submit(getHAServiceStateTask);
+      return getHAServiceStateWithTimeout(proxyInfo, task);
+    } catch (RejectedExecutionException e) {
+      LOG.warn("Run out of threads to submit the request to query HA state. "
+          + "Ok to return null and we will fallback to use active NN to serve "
+          + "this request.");
+      return null;
+    }
+  }
+
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo,
+      Future<HAServiceState> task) {
+    HAServiceState state = null;
+    try {
+      if (namenodeHAStateProbeTimeoutMs > 0) {
+        state = task.get(namenodeHAStateProbeTimeoutMs, TimeUnit.MILLISECONDS);
+      } else {
+        // Disable timeout by waiting indefinitely when 
namenodeHAStateProbeTimeoutSec is set to 0
+        // or a negative value.
+        state = task.get();
+      }
+      LOG.debug("HA State for {} is {}", proxyInfo.proxyInfo, state);
+    } catch (TimeoutException e) {
+      // Cancel the task on timeout
+      String msg = String.format("Cancel NN probe task due to timeout for %s", 
proxyInfo.proxyInfo);
+      LOG.warn(msg, e);
+      if (task != null) {

Review Comment:
   Can the task become null? We should have issues before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to