HDFS-7073. Allow falling back to a non-SASL connection on DataTransferProtocol 
in several edge cases. Contributed by Chris Nauroth.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f85cc14e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f85cc14e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f85cc14e

Branch: refs/heads/HDFS-6581
Commit: f85cc14eb49a46e81d2edcdc1ffe4d0852f193a5
Parents: 332e2e2
Author: cnauroth <cnaur...@apache.org>
Authored: Fri Sep 19 21:23:26 2014 -0700
Committer: cnauroth <cnaur...@apache.org>
Committed: Fri Sep 19 21:23:26 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 68 +++++++++++++++---
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    | 22 ++++--
 .../main/java/org/apache/hadoop/ipc/RPC.java    | 38 +++++++++-
 .../java/org/apache/hadoop/ipc/RpcEngine.java   |  9 +++
 .../apache/hadoop/ipc/WritableRpcEngine.java    | 27 +++++--
 .../java/org/apache/hadoop/ipc/TestRPC.java     | 12 +++-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 23 +++---
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 ++
 .../java/org/apache/hadoop/hdfs/HAUtil.java     |  2 +-
 .../org/apache/hadoop/hdfs/NameNodeProxies.java | 74 +++++++++++++++++---
 .../sasl/SaslDataTransferClient.java            | 48 ++++++++-----
 .../sasl/SaslDataTransferServer.java            | 26 +++++--
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  7 +-
 .../hdfs/server/balancer/NameNodeConnector.java |  4 +-
 .../hadoop/hdfs/server/datanode/DNConf.java     | 17 +++++
 .../hadoop/hdfs/server/datanode/DataNode.java   | 26 +++----
 .../hdfs/server/namenode/NamenodeFsck.java      | 20 ++----
 .../ha/AbstractNNFailoverProxyProvider.java     | 14 ++++
 .../ha/ConfiguredFailoverProxyProvider.java     |  2 +-
 .../datatransfer/sasl/TestSaslDataTransfer.java | 45 +++++++++---
 .../namenode/ha/TestRetryCacheWithHA.java       |  2 +-
 22 files changed, 382 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2f482c2..84fe552 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -687,7 +687,8 @@ public class Client {
      * a header to the server and starts
      * the connection thread that waits for responses.
      */
-    private synchronized void setupIOstreams() {
+    private synchronized void setupIOstreams(
+        AtomicBoolean fallbackToSimpleAuth) {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       } 
@@ -738,11 +739,18 @@ public class Client {
               remoteId.saslQop =
                   (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
               LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
-            } else if (UserGroupInformation.isSecurityEnabled() &&
-                       !fallbackAllowed) {
-              throw new IOException("Server asks us to fall back to SIMPLE " +
-                  "auth, but this client is configured to only allow secure " +
-                  "connections.");
+              if (fallbackToSimpleAuth != null) {
+                fallbackToSimpleAuth.set(false);
+              }
+            } else if (UserGroupInformation.isSecurityEnabled()) {
+              if (!fallbackAllowed) {
+                throw new IOException("Server asks us to fall back to SIMPLE " 
+
+                    "auth, but this client is configured to only allow secure 
" +
+                    "connections.");
+              }
+              if (fallbackToSimpleAuth != null) {
+                fallbackToSimpleAuth.set(true);
+              }
             }
           }
         
@@ -1375,6 +1383,26 @@ public class Client {
   /** 
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc respond.
+   *
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   * @returns the rpc response
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception.
+   */
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
+      fallbackToSimpleAuth);
+  }
+
+  /**
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc response.
    * 
    * @param rpcKind
    * @param rpcRequest -  contains serialized method and method parameters
@@ -1386,8 +1414,29 @@ public class Client {
    */
   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
       ConnectionId remoteId, int serviceClass) throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
+  }
+
+  /**
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc response.
+   *
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @param serviceClass - service class for RPC
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   * @returns the rpc response
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception.
+   */
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId, int serviceClass,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
     final Call call = createCall(rpcKind, rpcRequest);
-    Connection connection = getConnection(remoteId, call, serviceClass);
+    Connection connection = getConnection(remoteId, call, serviceClass,
+      fallbackToSimpleAuth);
     try {
       connection.sendRpcRequest(call);                 // send the rpc request
     } catch (RejectedExecutionException e) {
@@ -1444,7 +1493,8 @@ public class Client {
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given ConnectionId are reused. */
   private Connection getConnection(ConnectionId remoteId,
-      Call call, int serviceClass) throws IOException {
+      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     if (!running.get()) {
       // the client is stopped
       throw new IOException("The client is stopped");
@@ -1468,7 +1518,7 @@ public class Client {
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the
     //entire system down.
-    connection.setupIOstreams();
+    connection.setupIOstreams(fallbackToSimpleAuth);
     return connection;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0ccdb71..124d835 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -84,14 +85,23 @@ public class ProtobufRpcEngine implements RpcEngine {
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
       ) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+      rpcTimeout, connectionRetryPolicy, null);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
 
     final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
-        rpcTimeout, connectionRetryPolicy);
+        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
   }
@@ -115,13 +125,16 @@ public class ProtobufRpcEngine implements RpcEngine {
     private final Client client;
     private final long clientProtocolVersion;
     private final String protocolName;
+    private AtomicBoolean fallbackToSimpleAuth;
 
     private Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
-        int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
+        int rpcTimeout, RetryPolicy connectionRetryPolicy,
+        AtomicBoolean fallbackToSimpleAuth) throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
           conf, factory);
+      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     }
     
     /**
@@ -217,7 +230,8 @@ public class ProtobufRpcEngine implements RpcEngine {
       final RpcResponseWrapper val;
       try {
         val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
+            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
+            fallbackToSimpleAuth);
 
       } catch (Throwable e) {
         if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 4ae7956..40f6515 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -524,6 +525,7 @@ public class RPC {
    * @param conf configuration
    * @param factory socket factory
    * @param rpcTimeout max time for each rpc; 0 means no timeout
+   * @param connectionRetryPolicy retry policy
    * @return the proxy
    * @throws IOException if any error occurs
    */
@@ -535,11 +537,43 @@ public class RPC {
                                 SocketFactory factory,
                                 int rpcTimeout,
                                 RetryPolicy connectionRetryPolicy) throws 
IOException {    
+     return getProtocolProxy(protocol, clientVersion, addr, ticket,
+       conf, factory, rpcTimeout, connectionRetryPolicy, null);
+   }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server
+   *
+   * @param protocol protocol
+   * @param clientVersion client's version
+   * @param addr server address
+   * @param ticket security ticket
+   * @param conf configuration
+   * @param factory socket factory
+   * @param rpcTimeout max time for each rpc; 0 means no timeout
+   * @param connectionRetryPolicy retry policy
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate 
if
+   *   a secure client falls back to simple auth
+   * @return the proxy
+   * @throws IOException if any error occurs
+   */
+   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+                                long clientVersion,
+                                InetSocketAddress addr,
+                                UserGroupInformation ticket,
+                                Configuration conf,
+                                SocketFactory factory,
+                                int rpcTimeout,
+                                RetryPolicy connectionRetryPolicy,
+                                AtomicBoolean fallbackToSimpleAuth)
+       throws IOException {
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
-    return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
-        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
+    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
+        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
+        fallbackToSimpleAuth);
   }
 
    /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index a8280bd..047722e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -43,6 +44,14 @@ public interface RpcEngine {
                   SocketFactory factory, int rpcTimeout,
                   RetryPolicy connectionRetryPolicy) throws IOException;
 
+  /** Construct a client-side proxy object. */
+  <T> ProtocolProxy<T> getProxy(Class<T> protocol,
+                  long clientVersion, InetSocketAddress addr,
+                  UserGroupInformation ticket, Configuration conf,
+                  SocketFactory factory, int rpcTimeout,
+                  RetryPolicy connectionRetryPolicy,
+                  AtomicBoolean fallbackToSimpleAuth) throws IOException;
+
   /** 
    * Construct a server for a protocol implementation instance.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 4b2dfe0..c2d9435 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
 import java.io.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -212,14 +213,17 @@ public class WritableRpcEngine implements RpcEngine {
     private Client.ConnectionId remoteId;
     private Client client;
     private boolean isClosed = false;
+    private final AtomicBoolean fallbackToSimpleAuth;
 
     public Invoker(Class<?> protocol,
                    InetSocketAddress address, UserGroupInformation ticket,
                    Configuration conf, SocketFactory factory,
-                   int rpcTimeout) throws IOException {
+                   int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
+        throws IOException {
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
           ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory);
+      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     }
 
     @Override
@@ -238,7 +242,8 @@ public class WritableRpcEngine implements RpcEngine {
       ObjectWritable value;
       try {
         value = (ObjectWritable)
-          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), 
remoteId);
+          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
+            remoteId, fallbackToSimpleAuth);
       } finally {
         if (traceScope != null) traceScope.close();
       }
@@ -275,11 +280,25 @@ public class WritableRpcEngine implements RpcEngine {
    * talking to a server at the named address. 
    * @param <T>*/
   @Override
-  @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                          InetSocketAddress addr, UserGroupInformation ticket,
                          Configuration conf, SocketFactory factory,
                          int rpcTimeout, RetryPolicy connectionRetryPolicy)
+    throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+      rpcTimeout, connectionRetryPolicy, null);
+  }
+
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. 
+   * @param <T>*/
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+                         InetSocketAddress addr, UserGroupInformation ticket,
+                         Configuration conf, SocketFactory factory,
+                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
+                         AtomicBoolean fallbackToSimpleAuth)
     throws IOException {    
 
     if (connectionRetryPolicy != null) {
@@ -289,7 +308,7 @@ public class WritableRpcEngine implements RpcEngine {
 
     T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
         new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
-            factory, rpcTimeout));
+            factory, rpcTimeout, fallbackToSimpleAuth));
     return new ProtocolProxy<T>(protocol, proxy, true);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index f0e389f..c1b1bfb 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -276,12 +276,22 @@ public class TestRPC {
    */
   private static class StoppedRpcEngine implements RpcEngine {
 
-    @SuppressWarnings("unchecked")
     @Override
     public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
         InetSocketAddress addr, UserGroupInformation ticket, Configuration 
conf,
         SocketFactory factory, int rpcTimeout, RetryPolicy 
connectionRetryPolicy
         ) throws IOException {
+      return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+        rpcTimeout, connectionRetryPolicy, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        InetSocketAddress addr, UserGroupInformation ticket, Configuration 
conf,
+        SocketFactory factory, int rpcTimeout,
+        RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth
+        ) throws IOException {
       T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
               new Class[] { protocol }, new StoppedInvocationHandler());
       return new ProtocolProxy<T>(protocol, proxy, false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fa9bbcd..9babe96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -900,6 +900,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7105. Fix TestJournalNode#testFailToStartWithBadConfig to match log
     output change. (Ray Chiang via cnauroth)
 
+    HDFS-7105. Allow falling back to a non-SASL connection on
+    DataTransferProtocol in several edge cases. (cnauroth)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3f978fb..ed08be0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -22,8 +22,6 @@ import static 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
     .EncryptedKeyVersion;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -90,6 +88,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -616,13 +615,15 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
     NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
     if (numResponseToDrop > 0) {
       // This case is used for testing.
       LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
           + " is set to " + numResponseToDrop
           + ", this hacked client will proactively drop responses");
       proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
-          nameNodeUri, ClientProtocol.class, numResponseToDrop);
+          nameNodeUri, ClientProtocol.class, numResponseToDrop,
+          nnFallbackToSimpleAuth);
     }
     
     if (proxyInfo != null) {
@@ -637,7 +638,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       Preconditions.checkArgument(nameNodeUri != null,
           "null URI");
       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
-          ClientProtocol.class);
+          ClientProtocol.class, nnFallbackToSimpleAuth);
       this.dtService = proxyInfo.getDelegationTokenService();
       this.namenode = proxyInfo.getProxy();
     }
@@ -675,10 +676,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     }
     this.saslClient = new SaslDataTransferClient(
       DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-      TrustedChannelResolver.getInstance(conf),
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+      TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
   }
   
   /**
@@ -3113,4 +3111,13 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   public void setKeyProvider(KeyProviderCryptoExtension provider) {
     this.provider = provider;
   }
+
+  /**
+   * Returns the SaslDataTransferClient configured for this DFSClient.
+   *
+   * @return SaslDataTransferClient configured for this DFSClient
+   */
+  public SaslDataTransferClient getSaslDataTransferClient() {
+    return saslClient;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 609b4c6..3c5358f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -589,6 +589,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = 
"dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = 
"dfs.trustedchannel.resolver.class";
   public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = 
"dfs.data.transfer.protection";
+  public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
   public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = 
"dfs.data.transfer.saslproperties.resolver.class";
   public static final int    
DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES 
= "dfs.namenode.list.encryption.zones.num.responses";
@@ -703,4 +704,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =
       1000;
 
+  public static final String IGNORE_SECURE_PORTS_FOR_TESTING_KEY =
+      "ignore.secure.ports.for.testing";
+  public static final boolean IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT = false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
index 90acede..f91f709 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
@@ -244,7 +244,7 @@ public class HAUtil {
     // Create the proxy provider. Actual proxy is not created.
     AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies
         .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
-        false);
+        false, null);
 
     // No need to use logical URI since failover is not configured.
     if (provider == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index 1765334..fcc2f5f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -36,6 +36,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -145,13 +146,37 @@ public class NameNodeProxies {
   @SuppressWarnings("unchecked")
   public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
       URI nameNodeUri, Class<T> xface) throws IOException {
+    return createProxy(conf, nameNodeUri, xface, null);
+  }
+
+  /**
+   * Creates the namenode proxy with the passed protocol. This will handle
+   * creation of either HA- or non-HA-enabled proxy objects, depending upon
+   * if the provided URI is a configured logical URI.
+   *
+   * @param conf the configuration containing the required IPC
+   *        properties, client failover configurations, etc.
+   * @param nameNodeUri the URI pointing either to a specific NameNode
+   *        or to a logical nameservice.
+   * @param xface the IPC interface which should be created
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate 
if
+   *   a secure client falls back to simple auth
+   * @return an object containing both the proxy and the associated
+   *         delegation token service it corresponds to
+   * @throws IOException if there is an error creating the proxy
+   **/
+  @SuppressWarnings("unchecked")
+  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
+      URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
-        createFailoverProxyProvider(conf, nameNodeUri, xface, true);
+        createFailoverProxyProvider(conf, nameNodeUri, xface, true,
+          fallbackToSimpleAuth);
   
     if (failoverProxyProvider == null) {
       // Non-HA case
       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
-          UserGroupInformation.getCurrentUser(), true);
+          UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
     } else {
       // HA case
       Conf config = new Conf(conf);
@@ -187,6 +212,8 @@ public class NameNodeProxies {
    *        or to a logical nameservice.
    * @param xface the IPC interface which should be created
    * @param numResponseToDrop The number of responses to drop for each RPC call
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate 
if
+   *   a secure client falls back to simple auth
    * @return an object containing both the proxy and the associated
    *         delegation token service it corresponds to. Will return null of 
the
    *         given configuration does not support HA.
@@ -195,10 +222,12 @@ public class NameNodeProxies {
   @SuppressWarnings("unchecked")
   public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
       Configuration config, URI nameNodeUri, Class<T> xface,
-      int numResponseToDrop) throws IOException {
+      int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     Preconditions.checkArgument(numResponseToDrop > 0);
     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
-        createFailoverProxyProvider(config, nameNodeUri, xface, true);
+        createFailoverProxyProvider(config, nameNodeUri, xface, true,
+          fallbackToSimpleAuth);
 
     if (failoverProxyProvider != null) { // HA case
       int delay = config.getInt(
@@ -257,12 +286,35 @@ public class NameNodeProxies {
   public static <T> ProxyAndInfo<T> createNonHAProxy(
       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries) throws IOException {
+    return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
+  }
+
+  /**
+   * Creates an explicitly non-HA-enabled proxy object. Most of the time you
+   * don't want to use this, and should instead use {@link 
NameNodeProxies#createProxy}.
+   *
+   * @param conf the configuration object
+   * @param nnAddr address of the remote NN to connect to
+   * @param xface the IPC interface which should be created
+   * @param ugi the user who is making the calls on the proxy object
+   * @param withRetries certain interfaces have a non-standard retry policy
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   * @return an object containing both the proxy and the associated
+   *         delegation token service it corresponds to
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> ProxyAndInfo<T> createNonHAProxy(
+      Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
+      UserGroupInformation ugi, boolean withRetries,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
     Text dtService = SecurityUtil.buildTokenService(nnAddr);
   
     T proxy;
     if (xface == ClientProtocol.class) {
       proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
-          withRetries);
+          withRetries, fallbackToSimpleAuth);
     } else if (xface == JournalProtocol.class) {
       proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
     } else if (xface == NamenodeProtocol.class) {
@@ -351,7 +403,8 @@ public class NameNodeProxies {
   
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
-      boolean withRetries) throws IOException {
+      boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, 
ProtobufRpcEngine.class);
 
     final RetryPolicy defaultPolicy = 
@@ -367,8 +420,8 @@ public class NameNodeProxies {
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
         NetUtils.getDefaultSocketFactory(conf),
-        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
-            .getProxy();
+        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
+        fallbackToSimpleAuth).getProxy();
 
     if (withRetries) { // create the proxy with retries
 
@@ -440,8 +493,8 @@ public class NameNodeProxies {
   /** Creates the Failover proxy provider instance*/
   @VisibleForTesting
   public static <T> AbstractNNFailoverProxyProvider<T> 
createFailoverProxyProvider(
-      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort)
-      throws IOException {
+      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
     Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
     AbstractNNFailoverProxyProvider<T> providerNN;
     Preconditions.checkArgument(
@@ -490,6 +543,7 @@ public class NameNodeProxies {
             + " and does not use port information.");
       }
     }
+    providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
     return providerNN;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index 643af4a..9df9929 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -28,6 +28,7 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -71,21 +72,38 @@ public class SaslDataTransferClient {
   private static final Logger LOG = LoggerFactory.getLogger(
     SaslDataTransferClient.class);
 
-  private final boolean fallbackToSimpleAuthAllowed;
+  private final AtomicBoolean fallbackToSimpleAuth;
   private final SaslPropertiesResolver saslPropsResolver;
   private final TrustedChannelResolver trustedChannelResolver;
 
   /**
+   * Creates a new SaslDataTransferClient.  This constructor is used in cases
+   * where it is not relevant to track if a secure client did a fallback to
+   * simple auth.  For intra-cluster connections between data nodes in the same
+   * cluster, we can assume that all run under the same security configuration.
+   *
+   * @param saslPropsResolver for determining properties of SASL negotiation
+   * @param trustedChannelResolver for identifying trusted connections that do
+   *   not require SASL negotiation
+   */
+  public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+      TrustedChannelResolver trustedChannelResolver) {
+    this(saslPropsResolver, trustedChannelResolver, null);
+  }
+
+  /**
    * Creates a new SaslDataTransferClient.
    *
    * @param saslPropsResolver for determining properties of SASL negotiation
    * @param trustedChannelResolver for identifying trusted connections that do
    *   not require SASL negotiation
+   * @param fallbackToSimpleAuth checked on each attempt at general SASL
+   *   handshake, if true forces use of simple auth
    */
   public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
       TrustedChannelResolver trustedChannelResolver,
-      boolean fallbackToSimpleAuthAllowed) {
-    this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed;
+      AtomicBoolean fallbackToSimpleAuth) {
+    this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     this.saslPropsResolver = saslPropsResolver;
     this.trustedChannelResolver = trustedChannelResolver;
   }
@@ -221,22 +239,26 @@ public class SaslDataTransferClient {
         "SASL client skipping handshake in secured configuration with "
         + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
       return null;
-    } else if (accessToken.getIdentifier().length == 0) {
-      if (!fallbackToSimpleAuthAllowed) {
-        throw new IOException(
-          "No block access token was provided (insecure cluster), but this " +
-          "client is configured to allow only secure connections.");
-      }
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
       LOG.debug(
         "SASL client skipping handshake in secured configuration with "
         + "unsecured cluster for addr = {}, datanodeId = {}", addr, 
datanodeId);
       return null;
-    } else {
+    } else if (saslPropsResolver != null) {
       LOG.debug(
         "SASL client doing general handshake for addr = {}, datanodeId = {}",
         addr, datanodeId);
       return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
         datanodeId);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL.  The
+      // only way this can happen is if the DataNode has
+      // ignore.secure.ports.for.testing configured, so this is a rare edge 
case.
+      LOG.debug(
+        "SASL client skipping handshake in secured configuration with no SASL "
+        + "protection configured for addr = {}, datanodeId = {}",
+        addr, datanodeId);
+      return null;
     }
   }
 
@@ -348,12 +370,6 @@ public class SaslDataTransferClient {
       OutputStream underlyingOut, InputStream underlyingIn,
       Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
       throws IOException {
-    if (saslPropsResolver == null) {
-      throw new IOException(String.format("Cannot create a secured " +
-        "connection if DataNode listens on unprivileged port (%d) and no " +
-        "protection is defined in configuration property %s.",
-        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
-    }
     Map<String, String> saslProps = 
saslPropsResolver.getClientProperties(addr);
 
     String userName = buildUserName(accessToken);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index 7857057..2b82c82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -112,11 +112,29 @@ public class SaslDataTransferServer {
         "SASL server skipping handshake in unsecured configuration for "
         + "peer = {}, datanodeId = {}", peer, datanodeId);
       return new IOStreamPair(underlyingIn, underlyingOut);
-    } else {
+    } else if (dnConf.getSaslPropsResolver() != null) {
       LOG.debug(
         "SASL server doing general handshake for peer = {}, datanodeId = {}",
         peer, datanodeId);
       return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
+    } else if (dnConf.getIgnoreSecurePortsForTesting()) {
+      // It's a secured cluster using non-privileged ports, but no SASL.  The
+      // only way this can happen is if the DataNode has
+      // ignore.secure.ports.for.testing configured, so this is a rare edge 
case.
+      LOG.debug(
+        "SASL server skipping handshake in secured configuration with no SASL "
+        + "protection configured for peer = {}, datanodeId = {}",
+        peer, datanodeId);
+      return new IOStreamPair(underlyingIn, underlyingOut);
+    } else {
+      // The error message here intentionally does not mention
+      // ignore.secure.ports.for.testing.  That's intended for dev use only.
+      // This code path is not expected to execute ever, because DataNode 
startup
+      // checks for invalid configuration and aborts.
+      throw new IOException(String.format("Cannot create a secured " +
+        "connection if DataNode listens on unprivileged port (%d) and no " +
+        "protection is defined in configuration property %s.",
+        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
     }
   }
 
@@ -257,12 +275,6 @@ public class SaslDataTransferServer {
   private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
       InputStream underlyingIn, final DatanodeID datanodeId) throws 
IOException {
     SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
-    if (saslPropsResolver == null) {
-      throw new IOException(String.format("Cannot create a secured " +
-        "connection if DataNode listens on unprivileged port (%d) and no " +
-        "protection is defined in configuration property %s.",
-        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
-    }
     Map<String, String> saslProps = saslPropsResolver.getServerProperties(
       getPeerAddress(peer));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index df6aa99..cea1ab7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -48,8 +48,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.StorageType;
@@ -787,12 +785,9 @@ public class Dispatcher {
         : Executors.newFixedThreadPool(dispatcherThreads);
     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
-    final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
-        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.saslClient = new SaslDataTransferClient(
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-        TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
+        TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
   }
 
   public DistributedFileSystem getDistributedFileSystem() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index d27f33f..9162531 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -101,6 +102,7 @@ public class NameNodeConnector implements Closeable {
   private final NamenodeProtocol namenode;
   private final ClientProtocol client;
   private final KeyManager keyManager;
+  final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
   private final DistributedFileSystem fs;
   private final Path idPath;
@@ -120,7 +122,7 @@ public class NameNodeConnector implements Closeable {
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
         NamenodeProtocol.class).getProxy();
     this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
-        ClientProtocol.class).getProxy();
+        ClientProtocol.class, fallbackToSimpleAuth).getProxy();
     this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4a36472..3127682 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -48,6 +48,8 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEF
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -90,6 +92,7 @@ public class DNConf {
   final String encryptionAlgorithm;
   final SaslPropertiesResolver saslPropsResolver;
   final TrustedChannelResolver trustedChannelResolver;
+  private final boolean ignoreSecurePortsForTesting;
   
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
@@ -173,6 +176,9 @@ public class DNConf {
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
     this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
       conf);
+    this.ignoreSecurePortsForTesting = conf.getBoolean(
+        IGNORE_SECURE_PORTS_FOR_TESTING_KEY,
+        IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT);
     
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -238,4 +244,15 @@ public class DNConf {
   public TrustedChannelResolver getTrustedChannelResolver() {
     return trustedChannelResolver;
   }
+
+  /**
+   * Returns true if configuration is set to skip checking for proper
+   * port configuration in a secured cluster.  This is only intended for use in
+   * dev testing.
+   *
+   * @return true if configured to skip checking secured port configuration
+   */
+  public boolean getIgnoreSecurePortsForTesting() {
+    return ignoreSecurePortsForTesting;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 57ff2fa..b1ef186 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
@@ -46,9 +44,12 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.BufferedOutputStream;
@@ -170,6 +171,7 @@ import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -967,8 +969,6 @@ public class DataNode extends ReconfigurableBase
                      SecureResources resources
                      ) throws IOException {
 
-    checkSecureConfig(conf, resources);
-
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
     synchronized (this) {
@@ -976,6 +976,8 @@ public class DataNode extends ReconfigurableBase
     }
     this.conf = conf;
     this.dnConf = new DNConf(conf);
+    checkSecureConfig(dnConf, conf, resources);
+
     this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
 
     if (dnConf.maxLockedMemory > 0) {
@@ -1031,10 +1033,7 @@ public class DataNode extends ReconfigurableBase
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
     saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
-      dnConf.trustedChannelResolver,
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+      dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, 
blockPoolTokenSecretManager);
   }
 
@@ -1054,23 +1053,24 @@ public class DataNode extends ReconfigurableBase
    * must check if the target port is a privileged port, and if so, skip the
    * SASL handshake.
    *
+   * @param dnConf DNConf to check
    * @param conf Configuration to check
    * @param resources SecuredResources obtained for DataNode
    * @throws RuntimeException if security enabled, but configuration is 
insecure
    */
-  private static void checkSecureConfig(Configuration conf,
+  private static void checkSecureConfig(DNConf dnConf, Configuration conf,
       SecureResources resources) throws RuntimeException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
-    if (resources != null && dataTransferProtection == null) {
+    SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
+    if (resources != null && saslPropsResolver == null) {
       return;
     }
-    if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+    if (dnConf.getIgnoreSecurePortsForTesting()) {
       return;
     }
-    if (dataTransferProtection != null &&
+    if (saslPropsResolver != null &&
         DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
         resources == null) {
       return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 5cc8a47..a187123 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -59,10 +56,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
@@ -161,7 +155,6 @@ public class NamenodeFsck implements 
DataEncryptionKeyFactory {
   private List<String> snapshottableDirs = null;
 
   private final BlockPlacementPolicy bpPolicy;
-  private final SaslDataTransferClient saslClient;
 
   /**
    * Filesystem checker.
@@ -188,12 +181,6 @@ public class NamenodeFsck implements 
DataEncryptionKeyFactory {
         networktopology,
         namenode.getNamesystem().getBlockManager().getDatanodeManager()
         .getHost2DatanodeMap());
-    this.saslClient = new SaslDataTransferClient(
-      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-      TrustedChannelResolver.getInstance(conf),
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
     
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -594,7 +581,7 @@ public class NamenodeFsck implements 
DataEncryptionKeyFactory {
    * bad. Both places should be refactored to provide a method to copy blocks
    * around.
    */
-  private void copyBlock(DFSClient dfs, LocatedBlock lblock,
+  private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
                          OutputStream fos) throws Exception {
     int failures = 0;
     InetSocketAddress targetAddr = null;
@@ -647,8 +634,9 @@ public class NamenodeFsck implements 
DataEncryptionKeyFactory {
                 try {
                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-                  peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
-                        NamenodeFsck.this, blockToken, datanodeId);
+                  peer = TcpPeerServer.peerFromSocketAndKey(
+                        dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
+                        blockToken, datanodeId);
                 } finally {
                   if (peer == null) {
                     IOUtils.closeQuietly(s);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 3c0edfd..08e82be 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 
 public abstract class AbstractNNFailoverProxyProvider<T> implements
    FailoverProxyProvider <T> {
 
+  protected AtomicBoolean fallbackToSimpleAuth;
+
   /**
    * Inquire whether logical HA URI is used for the implementation. If it is
    * used, a special token handling may be needed to make sure a token 
acquired 
@@ -32,4 +36,14 @@ public abstract class AbstractNNFailoverProxyProvider<T> 
implements
    * @return true if logical HA URI is used. false, if not used.
    */
   public abstract boolean useLogicalURI(); 
+
+  /**
+   * Set for tracking if a secure client falls back to simple auth.
+   *
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   */
+  public void setFallbackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth) {
+    this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
index 4d196a2..06aa8fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
@@ -122,7 +122,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
     if (current.namenode == null) {
       try {
         current.namenode = NameNodeProxies.createNonHAProxy(conf,
-            current.address, xface, ugi, false).getProxy();
+            current.address, xface, ugi, false, 
fallbackToSimpleAuth).getProxy();
       } catch (IOException e) {
         LOG.error("Failed to create RPC proxy to NameNode", e);
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
index 7602f44..0d860b4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
+
 import static org.junit.Assert.*;
 
 import java.io.IOException;
@@ -29,11 +32,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
 
 public class TestSaslDataTransfer extends SaslDataTransferTestCase {
 
@@ -49,6 +54,9 @@ public class TestSaslDataTransfer extends 
SaslDataTransferTestCase {
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
+  @Rule
+  public Timeout timeout = new Timeout(60000);
+
   @After
   public void shutdown() {
     IOUtils.cleanup(null, fs);
@@ -99,17 +107,6 @@ public class TestSaslDataTransfer extends 
SaslDataTransferTestCase {
   }
 
   @Test
-  public void testClientSaslNoServerSasl() throws Exception {
-    HdfsConfiguration clusterConf = createSecureConfig("");
-    startCluster(clusterConf);
-    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
-    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
-    exception.expect(IOException.class);
-    exception.expectMessage("could only be replicated to 0 nodes");
-    doTest(clientConf);
-  }
-
-  @Test
   public void testServerSaslNoClientSasl() throws Exception {
     HdfsConfiguration clusterConf = createSecureConfig(
       "authentication,integrity,privacy");
@@ -121,6 +118,32 @@ public class TestSaslDataTransfer extends 
SaslDataTransferTestCase {
     doTest(clientConf);
   }
 
+  @Test
+  public void testDataNodeAbortsIfNoSasl() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("");
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("Cannot start secure DataNode");
+    startCluster(clusterConf);
+  }
+
+  @Test
+  public void testDataNodeAbortsIfNotHttpsOnly() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("authentication");
+    clusterConf.set(DFS_HTTP_POLICY_KEY,
+      HttpConfig.Policy.HTTP_AND_HTTPS.name());
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("Cannot start secure DataNode");
+    startCluster(clusterConf);
+  }
+
+  @Test
+  public void testNoSaslAndSecurePortsIgnored() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("");
+    clusterConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
+    startCluster(clusterConf);
+    doTest(clusterConf);
+  }
+
   /**
    * Tests DataTransferProtocol with the given client configuration.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f85cc14e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index 899b888..8f7d11a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -194,7 +194,7 @@ public class TestRetryCacheWithHA {
     URI nnUri = dfs.getUri();
     FailoverProxyProvider<ClientProtocol> failoverProxyProvider = 
         NameNodeProxies.createFailoverProxyProvider(conf, 
-            nnUri, ClientProtocol.class, true);
+            nnUri, ClientProtocol.class, true, null);
     InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
         failoverProxyProvider, RetryPolicies
         .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,

Reply via email to