This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 8b8ec65e65075d60e97f7344935c85c645793378
Author: Erik Krogen <[email protected]>
AuthorDate: Fri Dec 14 14:02:20 2018 -0800

    HDFS-14146. [SBN read] Handle exceptions from and prevent handler threads 
from blocking within internalQueueCall. Contributed by Chao Sun.
---
 .../java/org/apache/hadoop/ipc/ExternalCall.java   |  3 +-
 .../main/java/org/apache/hadoop/ipc/Server.java    | 39 +++++++++++---
 .../namenode/ha/TestConsistentReadsObserver.java   | 63 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
index 5566136..5cc3665 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.ipc.Server.Call;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.security.UserGroupInformation;
 
 public abstract class ExternalCall<T> extends Call {
@@ -78,7 +79,7 @@ public abstract class ExternalCall<T> extends Call {
   }
 
   @Override
-  final void doResponse(Throwable t) {
+  final void doResponse(Throwable t, RpcStatusProto status) {
     synchronized(done) {
       error = t;
       done.set(true);
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index c684314..92d850d 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -800,7 +800,11 @@ public abstract class Server {
       }
     }
 
-    void doResponse(Throwable t) throws IOException {}
+    void doResponse(Throwable t) throws IOException {
+      doResponse(t, RpcStatusProto.FATAL);
+    }
+
+    void doResponse(Throwable t, RpcStatusProto proto) throws IOException {}
 
     // For Schedulable
     @Override
@@ -967,15 +971,17 @@ public abstract class Server {
     }
 
     @Override
-    void doResponse(Throwable t) throws IOException {
+    void doResponse(Throwable t, RpcStatusProto status) throws IOException {
       RpcCall call = this;
       if (t != null) {
+        if (status == null) {
+          status = RpcStatusProto.FATAL;
+        }
         // clone the call to prevent a race with another thread stomping
         // on the response while being sent.  the original call is
         // effectively discarded since the wait count won't hit zero
         call = new RpcCall(this);
-        setupResponse(call,
-            RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
+        setupResponse(call, status, RpcErrorCodeProto.ERROR_RPC_SERVER,
             null, t.getClass().getName(), StringUtils.stringifyException(t));
       } else {
         setupResponse(call, call.responseParams.returnStatus,
@@ -2707,8 +2713,18 @@ public abstract class Server {
 
   private void internalQueueCall(Call call)
       throws IOException, InterruptedException {
+    internalQueueCall(call, true);
+  }
+
+  private void internalQueueCall(Call call, boolean blocking)
+      throws IOException, InterruptedException {
     try {
-      callQueue.put(call); // queue the call; maybe blocked here
+      // queue the call, may be blocked if blocking is true.
+      if (blocking) {
+        callQueue.put(call);
+      } else {
+        callQueue.add(call);
+      }
     } catch (CallQueueOverflowException cqe) {
       // If rpc scheduler indicates back off based on performance degradation
       // such as response time or rpc queue is full, we will ask the client
@@ -2751,8 +2767,8 @@ public abstract class Server {
              * In case of Observer, it handles only reads, which are
              * commutative.
              */
-            //Re-queue the call and continue
-            internalQueueCall(call);
+            // Re-queue the call and continue
+            requeueCall(call);
             continue;
           }
           if (LOG.isDebugEnabled()) {
@@ -2794,6 +2810,15 @@ public abstract class Server {
       LOG.debug(Thread.currentThread().getName() + ": exiting");
     }
 
+    private void requeueCall(Call call)
+        throws IOException, InterruptedException {
+      try {
+        internalQueueCall(call, false);
+      } catch (RpcServerException rse) {
+        call.doResponse(rse.getCause(), rse.getRpcStatusProto());
+      }
+    }
+
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index e1fadaf..fe5345d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -25,12 +25,16 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RpcScheduler;
+import org.apache.hadoop.ipc.Schedulable;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -85,6 +89,36 @@ public class TestConsistentReadsObserver {
   }
 
   @Test
+  public void testRequeueCall() throws Exception {
+    setObserverRead(true);
+
+    // Update the configuration just for the observer, by enabling
+    // IPC backoff and using the test scheduler class, which starts to backoff
+    // after certain number of calls.
+    final int observerIdx = 2;
+    NameNode nn = dfsCluster.getNameNode(observerIdx);
+    int port = nn.getNameNodeAddress().getPort();
+    Configuration configuration = dfsCluster.getConfiguration(observerIdx);
+    String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
+    configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+        TestRpcScheduler.class.getName());
+    configuration.setBoolean(prefix
+        + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+
+    dfsCluster.restartNameNode(observerIdx);
+    dfsCluster.transitionToObserver(observerIdx);
+
+    dfs.create(testPath, (short)1).close();
+    assertSentTo(0);
+
+    // Since we haven't tailed edit logs on the observer, it will fall behind
+    // and keep re-queueing the incoming request. Eventually, RPC backoff will
+    // be triggered and client should retry active NN.
+    dfs.getFileStatus(testPath);
+    assertSentTo(0);
+  }
+
+  @Test
   public void testMsyncSimple() throws Exception {
     // 0 == not completed, 1 == succeeded, -1 == failed
     AtomicInteger readStatus = new AtomicInteger(0);
@@ -169,4 +203,33 @@ public class TestConsistentReadsObserver {
     dfs = HATestUtil.configureObserverReadFs(
         dfsCluster, conf, ObserverReadProxyProvider.class, flag);
   }
+
+  /**
+   * A dummy test scheduler that starts backoff after a fixed number
+   * of requests.
+   */
+  public static class TestRpcScheduler implements RpcScheduler {
+    // Allow a number of RPCs to pass in order for the NN restart to succeed.
+    private int allowed = 10;
+    public TestRpcScheduler() {}
+
+    @Override
+    public int getPriorityLevel(Schedulable obj) {
+      return 0;
+    }
+
+    @Override
+    public boolean shouldBackOff(Schedulable obj) {
+      return --allowed < 0;
+    }
+
+    @Override
+    public void addResponseTime(String name, int priorityLevel, int queueTime,
+        int processingTime) {
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to