Repository: hadoop
Updated Branches:
  refs/heads/branch-2 e341e5151 -> 5305a392c


HDFS-10789. Route webhdfs through the RPC call queue. Contributed by Daryn 
Sharp and Rushabh S Shah.

(cherry picked from commit 85cd06f6636f295ad1f3bf2a90063f4714c9cca7)

Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
        hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5305a392
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5305a392
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5305a392

Branch: refs/heads/branch-2
Commit: 5305a392c39d298ecf38ca2dfd2526adeee9cd38
Parents: e341e51
Author: Kihwal Lee <kih...@apache.org>
Authored: Wed Oct 12 15:22:51 2016 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Wed Oct 12 15:22:51 2016 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/ExternalCall.java     |   9 +-
 .../java/org/apache/hadoop/ipc/TestRPC.java     |   6 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   3 +
 .../hdfs/server/namenode/FSNamesystem.java      |  15 +-
 .../hadoop/hdfs/server/namenode/NameNode.java   |  12 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   6 +-
 .../web/resources/NamenodeWebHdfsMethods.java   | 150 +++++++++++--------
 .../src/main/resources/hdfs-default.xml         |   8 +
 .../server/namenode/TestNamenodeRetryCache.java |  25 +++-
 .../web/resources/TestWebHdfsDataLocality.java  |  25 +++-
 10 files changed, 161 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
index 9b4cbcf..5566136 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.ipc.Server.Call;
@@ -37,14 +38,10 @@ public abstract class ExternalCall<T> extends Call {
 
   public abstract UserGroupInformation getRemoteUser();
 
-  public final T get() throws IOException, InterruptedException {
+  public final T get() throws InterruptedException, ExecutionException {
     waitForCompletion();
     if (error != null) {
-      if (error instanceof IOException) {
-        throw (IOException)error;
-      } else {
-        throw new IOException(error);
-      }
+      throw new ExecutionException(error);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 7a57e5a..287f0e5 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -72,6 +72,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -1001,8 +1002,9 @@ public class TestRPC extends TestRpcBase {
       try {
         exceptionCall.get();
         fail("didn't throw");
-      } catch (IOException ioe) {
-        assertEquals(expectedIOE.getMessage(), ioe.getMessage());
+      } catch (ExecutionException ee) {
+        assertTrue((ee.getCause()) instanceof IOException);
+        assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage());
       }
     } finally {
       server.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a6afd79..5a7d417 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -73,6 +73,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.webhdfs.ugi.expire.after.access";
   public static final int     DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
       10*60*1000; //10 minutes
+  public static final String DFS_WEBHDFS_USE_IPC_CALLQ =
+      "dfs.webhdfs.use.ipc.callq";
+  public static final boolean DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT = true;
 
   // HA related configuration
   public static final String  DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = 
"dfs.datanode.restart.replica.expiration";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index f337dab..9e357a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -246,7 +246,6 @@ import 
org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import 
org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -327,7 +326,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   private void logAuditEvent(boolean succeeded, String cmd, String src,
       String dst, HdfsFileStatus stat) throws IOException {
     if (isAuditEnabled() && isExternalInvocation()) {
-      logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
+      logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
                     cmd, src, dst, stat);
     }
   }
@@ -5228,17 +5227,9 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    * RPC call context even if the client exits.
    */
   boolean isExternalInvocation() {
-    return Server.isRpcInvocation() || 
NamenodeWebHdfsMethods.isWebHdfsInvocation();
+    return Server.isRpcInvocation();
   }
 
-  private static InetAddress getRemoteIp() {
-    InetAddress ip = Server.getRemoteIp();
-    if (ip != null) {
-      return ip;
-    }
-    return NamenodeWebHdfsMethods.getRemoteIp();
-  }
-  
   // optimize ugi lookup for RPC operations to avoid a trip through
   // UGI.getCurrentUser which is synch'ed
   private static UserGroupInformation getRemoteUser() throws IOException {
@@ -6773,7 +6764,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
           sb.append(trackingId);
         }
         sb.append("\t").append("proto=");
-        sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : 
"rpc");
+        sb.append(Server.getProtocol());
         if (isCallerContextEnabled &&
             callerContext != null &&
             callerContext.isContextValid()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index c9006a1..14646c1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -64,7 +64,9 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.ExternalCall;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -408,7 +410,15 @@ public class NameNode extends ReconfigurableBase implements
   public NamenodeProtocols getRpcServer() {
     return rpcServer;
   }
-  
+
+  public void queueExternalCall(ExternalCall<?> extCall)
+      throws IOException, InterruptedException {
+    if (rpcServer == null) {
+      throw new RetriableException("Namenode is in startup mode");
+    }
+    rpcServer.getClientRpcServer().queueCall(extCall);
+  }
+
   public static void initMetrics(Configuration conf, NamenodeRole role) {
     metrics = NameNodeMetrics.create(conf, role);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57f9494..aa1b51a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -138,7 +138,6 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import 
org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1683,10 +1682,7 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
   }
 
   private static String getClientMachine() {
-    String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
-    if (clientMachine == null) { //not a web client
-      clientMachine = Server.getRemoteAddress();
-    }
+    String clientMachine = Server.getRemoteAddress();
     if (clientMachine == null) { //not a RPC client
       clientMachine = "";
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index c74a7bc..a28576b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -25,10 +25,13 @@ import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.Principal;
 import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -58,6 +61,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -79,8 +83,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.*;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ExternalCall;
 import org.apache.hadoop.ipc.RetriableException;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.Credentials;
@@ -101,39 +105,39 @@ public class NamenodeWebHdfsMethods {
   public static final Log LOG = 
LogFactory.getLog(NamenodeWebHdfsMethods.class);
 
   private static final UriFsPathParam ROOT = new UriFsPathParam("");
-  
-  private static final ThreadLocal<String> REMOTE_ADDRESS = new 
ThreadLocal<String>(); 
-
-  /** @return the remote client address. */
-  public static String getRemoteAddress() {
-    return REMOTE_ADDRESS.get();
-  }
-
-  public static InetAddress getRemoteIp() {
-    try {
-      return InetAddress.getByName(getRemoteAddress());
-    } catch (Exception e) {
-      return null;
-    }
-  }
 
-  /**
-   * Returns true if a WebHdfs request is in progress.  Akin to
-   * {@link Server#isRpcInvocation()}.
-   */
-  public static boolean isWebHdfsInvocation() {
-    return getRemoteAddress() != null;
-  }
+  private volatile Boolean useIpcCallq;
+  private String scheme;
+  private Principal userPrincipal;
+  private String remoteAddr;
 
   private @Context ServletContext context;
-  private @Context HttpServletRequest request;
   private @Context HttpServletResponse response;
 
+  public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
+    // the request object is a proxy to thread-locals so we have to extract
+    // what we want from it since the external call will be processed in a
+    // different thread.
+    scheme = request.getScheme();
+    userPrincipal = request.getUserPrincipal();
+    // get the remote address, if coming in via a trusted proxy server then
+    // the address with be that of the proxied client
+    remoteAddr = JspHelper.getRemoteAddr(request);
+  }
+
   private void init(final UserGroupInformation ugi,
       final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final UriFsPathParam path, final HttpOpParam<?> op,
       final Param<?, ?>... parameters) {
+    if (useIpcCallq == null) {
+      Configuration conf =
+          (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
+      useIpcCallq = conf.getBoolean(
+          DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ,
+          DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT);
+    }
+
     if (LOG.isTraceEnabled()) {
       LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
           + ", ugi=" + ugi + ", " + username + ", " + doAsUser
@@ -142,16 +146,8 @@ public class NamenodeWebHdfsMethods {
 
     //clear content type
     response.setContentType(null);
-    
-    // set the remote address, if coming in via a trust proxy server then
-    // the address with be that of the proxied client
-    REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
   }
 
-  private void reset() {
-    REMOTE_ADDRESS.set(null);
-  }
-  
   private static NamenodeProtocols getRPCServer(NameNode namenode)
       throws IOException {
      final NamenodeProtocols np = namenode.getRpcServer();
@@ -160,11 +156,63 @@ public class NamenodeWebHdfsMethods {
      }
      return np;
   }
-  
+
+  private <T> T doAs(final UserGroupInformation ugi,
+      final PrivilegedExceptionAction<T> action)
+          throws IOException, InterruptedException {
+    return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action);
+  }
+
+  private <T> T doAsExternalCall(final UserGroupInformation ugi,
+      final PrivilegedExceptionAction<T> action)
+          throws IOException, InterruptedException {
+    // set the remote address, if coming in via a trust proxy server then
+    // the address with be that of the proxied client
+    ExternalCall<T> call = new ExternalCall<T>(action){
+      @Override
+      public UserGroupInformation getRemoteUser() {
+        return ugi;
+      }
+      @Override
+      public String getProtocol() {
+        return "webhdfs";
+      }
+      @Override
+      public String getHostAddress() {
+        return remoteAddr;
+      }
+      @Override
+      public InetAddress getHostInetAddress() {
+        try {
+          return InetAddress.getByName(getHostAddress());
+        } catch (UnknownHostException e) {
+          return null;
+        }
+      }
+    };
+    final NameNode namenode = (NameNode)context.getAttribute("name.node");
+    namenode.queueExternalCall(call);
+    T result = null;
+    try {
+      result = call.get();
+    } catch (ExecutionException ee) {
+      Throwable t = ee.getCause();
+      if (t instanceof RuntimeException) {
+        throw (RuntimeException)t;
+      } else if (t instanceof IOException) {
+        throw (IOException)t;
+      } else {
+        throw new IOException(t);
+      }
+    }
+    return result;
+  }
+
   @VisibleForTesting
   static DatanodeInfo chooseDatanode(final NameNode namenode,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      final long blocksize, final String excludeDatanodes) throws IOException {
+      final long blocksize, final String excludeDatanodes,
+      final String remoteAddr) throws IOException {
     FSNamesystem fsn = namenode.getNamesystem();
     if (fsn == null) {
       throw new IOException("Namesystem has not been intialized yet.");
@@ -188,7 +236,7 @@ public class NamenodeWebHdfsMethods {
     if (op == PutOpParam.Op.CREATE) {
       //choose a datanode near to client 
       final DatanodeDescriptor clientNode = bm.getDatanodeManager(
-          ).getDatanodeByHost(getRemoteAddress());
+          ).getDatanodeByHost(remoteAddr);
       if (clientNode != null) {
         final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
             path, clientNode, excludes, blocksize);
@@ -251,7 +299,8 @@ public class NamenodeWebHdfsMethods {
       return null;
     }
     final Token<? extends TokenIdentifier> t = 
c.getAllTokens().iterator().next();
-    Text kind = request.getScheme().equals("http") ? 
WebHdfsConstants.WEBHDFS_TOKEN_KIND
+    Text kind = scheme.equals("http")
+        ? WebHdfsConstants.WEBHDFS_TOKEN_KIND
         : WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
     t.setKind(kind);
     return t;
@@ -265,7 +314,7 @@ public class NamenodeWebHdfsMethods {
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
     final DatanodeInfo dn;
     dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
-        excludeDatanodes);
+        excludeDatanodes, remoteAddr);
     if (dn == null) {
       throw new IOException("Failed to find datanode, suggest to check cluster"
           + " health. excludeDatanodes=" + excludeDatanodes);
@@ -281,7 +330,7 @@ public class NamenodeWebHdfsMethods {
     } else {
       //generate a token
       final Token<? extends TokenIdentifier> t = generateDelegationToken(
-          namenode, ugi, request.getUserPrincipal().getName());
+          namenode, ugi, userPrincipal.getName());
       delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
     }
     final String query = op.toQueryString() + delegationQuery
@@ -289,7 +338,6 @@ public class NamenodeWebHdfsMethods {
         + Param.toSortedString("&", parameters);
     final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
 
-    final String scheme = request.getScheme();
     int port = "http".equals(scheme) ? dn.getInfoPort() : dn
         .getInfoSecurePort();
     final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
@@ -437,10 +485,9 @@ public class NamenodeWebHdfsMethods {
         aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
         oldSnapshotName, excludeDatanodes, createFlagParam, noredirect);
 
-    return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+    return doAs(ugi, new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        try {
           return put(ugi, delegation, username, doAsUser,
               path.getAbsolutePath(), op, destination, owner, group,
               permission, overwrite, bufferSize, replication, blockSize,
@@ -448,9 +495,6 @@ public class NamenodeWebHdfsMethods {
               delegationTokenArgument, aclPermission, xattrName, xattrValue,
               xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
               createFlagParam, noredirect);
-        } finally {
-          reset();
-        }
       }
     });
   }
@@ -678,16 +722,12 @@ public class NamenodeWebHdfsMethods {
     init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
         excludeDatanodes, newLength);
 
-    return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+    return doAs(ugi, new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        try {
           return post(ugi, delegation, username, doAsUser,
               path.getAbsolutePath(), op, concatSrcs, bufferSize,
               excludeDatanodes, newLength, noredirect);
-        } finally {
-          reset();
-        }
       }
     });
   }
@@ -833,17 +873,13 @@ public class NamenodeWebHdfsMethods {
         renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
         tokenKind, tokenService, startAfter);
 
-    return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+    return doAs(ugi, new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        try {
           return get(ugi, delegation, username, doAsUser,
               path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
               xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
               tokenService, noredirect, startAfter);
-        } finally {
-          reset();
-        }
       }
     });
   }
@@ -1113,15 +1149,11 @@ public class NamenodeWebHdfsMethods {
 
     init(ugi, delegation, username, doAsUser, path, op, recursive, 
snapshotName);
 
-    return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+    return doAs(ugi, new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException {
-        try {
           return delete(ugi, delegation, username, doAsUser,
               path.getAbsolutePath(), op, recursive, snapshotName);
-        } finally {
-          reset();
-        }
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7806b18..1a08f65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4093,4 +4093,12 @@
   <description>Instrumentation reporting long critical sections will suppress
     consecutive warnings within this interval.</description>
 </property>
+
+  <property>
+    <name>dfs.webhdfs.use.ipc.callq</name>
+    <value>true</value>
+    <description>Enables routing of webhdfs calls through rpc
+      call queue</description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
index 26efce5..d7a2c81 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.LightWeightCache;
 import org.junit.After;
@@ -111,19 +112,33 @@ public class TestNamenodeRetryCache {
     }
   }
   
+  static class DummyCall extends Server.Call {
+    private UserGroupInformation ugi;
+
+    DummyCall(int callId, byte[] clientId) {
+      super(callId, 1, null, null, RpcKind.RPC_PROTOCOL_BUFFER, clientId);
+      try {
+        ugi = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+      }
+    }
+    @Override
+    public UserGroupInformation getRemoteUser() {
+      return ugi;
+    }
+  }
   /** Set the current Server RPC call */
   public static void newCall() {
-    Server.Call call = new Server.Call(++callId, 1, null, null,
-        RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
+    Server.Call call = new DummyCall(++callId, CLIENT_ID);
     Server.getCurCall().set(call);
   }
   
   public static void resetCall() {
-    Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
-        null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
+    Server.Call call = new DummyCall(RpcConstants.INVALID_CALL_ID,
+        RpcConstants.DUMMY_CLIENT_ID);
     Server.getCurCall().set(call);
   }
-  
+
   private void concatSetup(String file1, String file2) throws Exception {
     DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 
0L);
     DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 
0L);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5305a392/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
index 15e1c04..604bf79 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.web.resources;
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.List;
 
@@ -62,6 +63,9 @@ public class TestWebHdfsDataLocality {
   private static final String RACK1 = "/rack1";
   private static final String RACK2 = "/rack2";
 
+  private static final String LOCALHOST =
+      InetAddress.getLoopbackAddress().getHostName();
+
   @Rule
   public final ExpectedException exception = ExpectedException.none();
 
@@ -96,7 +100,8 @@ public class TestWebHdfsDataLocality {
 
           //The chosen datanode must be the same as the client address
           final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-              namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
+              namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
+              LOCALHOST);
           Assert.assertEquals(ipAddr, chosen.getIpAddr());
         }
       }
@@ -121,19 +126,22 @@ public class TestWebHdfsDataLocality {
 
       { //test GETFILECHECKSUM
         final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-            namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
+            namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
+            LOCALHOST);
         Assert.assertEquals(expected, chosen);
       }
   
       { //test OPEN
         final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-            namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
+            namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
+            LOCALHOST);
         Assert.assertEquals(expected, chosen);
       }
 
       { //test APPEND
         final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-            namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
+            namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
+            LOCALHOST);
         Assert.assertEquals(expected, chosen);
       }
     } finally {
@@ -189,7 +197,7 @@ public class TestWebHdfsDataLocality {
         { // test GETFILECHECKSUM
           final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
               namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
-              sb.toString());
+              sb.toString(), LOCALHOST);
           for (int j = 0; j <= i; j++) {
             Assert.assertNotEquals(locations[j].getHostName(),
                 chosen.getHostName());
@@ -198,7 +206,8 @@ public class TestWebHdfsDataLocality {
 
         { // test OPEN
           final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-              namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
+              namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
+              LOCALHOST);
           for (int j = 0; j <= i; j++) {
             Assert.assertNotEquals(locations[j].getHostName(),
                 chosen.getHostName());
@@ -208,7 +217,7 @@ public class TestWebHdfsDataLocality {
         { // test APPEND
           final DatanodeInfo chosen = NamenodeWebHdfsMethods
               .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
-                  blocksize, sb.toString());
+                  blocksize, sb.toString(), LOCALHOST);
           for (int j = 0; j <= i; j++) {
             Assert.assertNotEquals(locations[j].getHostName(),
                 chosen.getHostName());
@@ -229,6 +238,6 @@ public class TestWebHdfsDataLocality {
     exception.expect(IOException.class);
     exception.expectMessage("Namesystem has not been intialized yet.");
     NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
-        DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
+        DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to