Repository: spark
Updated Branches:
  refs/heads/master d9783380f -> 8f3f73abc


http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
new file mode 100644
index 0000000..21609d5
--- /dev/null
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.crypto;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.SaslServerBootstrap;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.MapConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class AuthIntegrationSuite {
+
+  private AuthTestCtx ctx;
+
+  @After
+  public void cleanUp() throws Exception {
+    if (ctx != null) {
+      ctx.close();
+    }
+    ctx = null;
+  }
+
+  @Test
+  public void testNewAuth() throws Exception {
+    ctx = new AuthTestCtx();
+    ctx.createServer("secret");
+    ctx.createClient("secret");
+
+    ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 
5000);
+    assertEquals("Pong", JavaUtils.bytesToString(reply));
+    assertTrue(ctx.authRpcHandler.doDelegate);
+    assertFalse(ctx.authRpcHandler.delegate instanceof SaslRpcHandler);
+  }
+
+  @Test
+  public void testAuthFailure() throws Exception {
+    ctx = new AuthTestCtx();
+    ctx.createServer("server");
+
+    try {
+      ctx.createClient("client");
+      fail("Should have failed to create client.");
+    } catch (Exception e) {
+      assertFalse(ctx.authRpcHandler.doDelegate);
+      assertFalse(ctx.serverChannel.isActive());
+    }
+  }
+
+  @Test
+  public void testSaslServerFallback() throws Exception {
+    ctx = new AuthTestCtx();
+    ctx.createServer("secret", true);
+    ctx.createClient("secret", false);
+
+    ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 
5000);
+    assertEquals("Pong", JavaUtils.bytesToString(reply));
+  }
+
+  @Test
+  public void testSaslClientFallback() throws Exception {
+    ctx = new AuthTestCtx();
+    ctx.createServer("secret", false);
+    ctx.createClient("secret", true);
+
+    ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 
5000);
+    assertEquals("Pong", JavaUtils.bytesToString(reply));
+  }
+
+  @Test
+  public void testAuthReplay() throws Exception {
+    // This test covers the case where an attacker replays a challenge message 
sniffed from the
+    // network, but doesn't know the actual secret. The server should close 
the connection as
+    // soon as a message is sent after authentication is performed. This is 
emulated by removing
+    // the client encryption handler after authentication.
+    ctx = new AuthTestCtx();
+    ctx.createServer("secret");
+    ctx.createClient("secret");
+
+    assertNotNull(ctx.client.getChannel().pipeline()
+      .remove(TransportCipher.ENCRYPTION_HANDLER_NAME));
+
+    try {
+      ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000);
+      fail("Should have failed unencrypted RPC.");
+    } catch (Exception e) {
+      assertTrue(ctx.authRpcHandler.doDelegate);
+    }
+  }
+
+  private class AuthTestCtx {
+
+    private final String appId = "testAppId";
+    private final TransportConf conf;
+    private final TransportContext ctx;
+
+    TransportClient client;
+    TransportServer server;
+    volatile Channel serverChannel;
+    volatile AuthRpcHandler authRpcHandler;
+
+    AuthTestCtx() throws Exception {
+      Map<String, String> testConf = 
ImmutableMap.of("spark.network.crypto.enabled", "true");
+      this.conf = new TransportConf("rpc", new MapConfigProvider(testConf));
+
+      RpcHandler rpcHandler = new RpcHandler() {
+        @Override
+        public void receive(
+            TransportClient client,
+            ByteBuffer message,
+            RpcResponseCallback callback) {
+          assertEquals("Ping", JavaUtils.bytesToString(message));
+          callback.onSuccess(JavaUtils.stringToBytes("Pong"));
+        }
+
+        @Override
+        public StreamManager getStreamManager() {
+          return null;
+        }
+      };
+
+      this.ctx = new TransportContext(conf, rpcHandler);
+    }
+
+    void createServer(String secret) throws Exception {
+      createServer(secret, true);
+    }
+
+    void createServer(String secret, boolean enableAes) throws Exception {
+      TransportServerBootstrap introspector = new TransportServerBootstrap() {
+        @Override
+        public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
+          AuthTestCtx.this.serverChannel = channel;
+          if (rpcHandler instanceof AuthRpcHandler) {
+            AuthTestCtx.this.authRpcHandler = (AuthRpcHandler) rpcHandler;
+          }
+          return rpcHandler;
+        }
+      };
+      SecretKeyHolder keyHolder = createKeyHolder(secret);
+      TransportServerBootstrap auth = enableAes ? new 
AuthServerBootstrap(conf, keyHolder)
+        : new SaslServerBootstrap(conf, keyHolder);
+      this.server = ctx.createServer(Lists.newArrayList(auth, introspector));
+    }
+
+    void createClient(String secret) throws Exception {
+      createClient(secret, true);
+    }
+
+    void createClient(String secret, boolean enableAes) throws Exception {
+      TransportConf clientConf = enableAes ? conf
+        : new TransportConf("rpc", MapConfigProvider.EMPTY);
+      List<TransportClientBootstrap> bootstraps = 
Lists.<TransportClientBootstrap>newArrayList(
+        new AuthClientBootstrap(clientConf, appId, createKeyHolder(secret)));
+      this.client = ctx.createClientFactory(bootstraps)
+        .createClient(TestUtils.getLocalHost(), server.getPort());
+    }
+
+    void close() {
+      if (client != null) {
+        client.close();
+      }
+      if (server != null) {
+        server.close();
+      }
+    }
+
+    private SecretKeyHolder createKeyHolder(String secret) {
+      SecretKeyHolder keyHolder = mock(SecretKeyHolder.class);
+      when(keyHolder.getSaslUser(anyString())).thenReturn(appId);
+      when(keyHolder.getSecretKey(anyString())).thenReturn(secret);
+      return keyHolder;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java
new file mode 100644
index 0000000..a90ff24
--- /dev/null
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.crypto;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.spark.network.protocol.Encodable;
+
+public class AuthMessagesSuite {
+
+  private static int COUNTER = 0;
+
+  private static String string() {
+    return String.valueOf(COUNTER++);
+  }
+
+  private static byte[] byteArray() {
+    byte[] bytes = new byte[COUNTER++];
+    for (int i = 0; i < bytes.length; i++) {
+      bytes[i] = (byte) COUNTER;
+    } return bytes;
+  }
+
+  private static int integer() {
+    return COUNTER++;
+  }
+
+  @Test
+  public void testClientChallenge() {
+    ClientChallenge msg = new ClientChallenge(string(), string(), integer(), 
string(), integer(),
+      byteArray(), byteArray());
+    ClientChallenge decoded = ClientChallenge.decodeMessage(encode(msg));
+
+    assertEquals(msg.appId, decoded.appId);
+    assertEquals(msg.kdf, decoded.kdf);
+    assertEquals(msg.iterations, decoded.iterations);
+    assertEquals(msg.cipher, decoded.cipher);
+    assertEquals(msg.keyLength, decoded.keyLength);
+    assertTrue(Arrays.equals(msg.nonce, decoded.nonce));
+    assertTrue(Arrays.equals(msg.challenge, decoded.challenge));
+  }
+
+  @Test
+  public void testServerResponse() {
+    ServerResponse msg = new ServerResponse(byteArray(), byteArray(), 
byteArray(), byteArray());
+    ServerResponse decoded = ServerResponse.decodeMessage(encode(msg));
+    assertTrue(Arrays.equals(msg.response, decoded.response));
+    assertTrue(Arrays.equals(msg.nonce, decoded.nonce));
+    assertTrue(Arrays.equals(msg.inputIv, decoded.inputIv));
+    assertTrue(Arrays.equals(msg.outputIv, decoded.outputIv));
+  }
+
+  private ByteBuffer encode(Encodable msg) {
+    ByteBuf buf = Unpooled.buffer();
+    msg.encode(buf);
+    return buf.nioBuffer();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index e27301f..87129b9 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -56,7 +56,6 @@ import org.apache.spark.network.client.ChunkReceivedCallback;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientBootstrap;
-import org.apache.spark.network.sasl.aes.AesCipher;
 import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.server.TransportServer;
@@ -153,7 +152,7 @@ public class SparkSaslSuite {
       .when(rpcHandler)
       .receive(any(TransportClient.class), any(ByteBuffer.class), 
any(RpcResponseCallback.class));
 
-    SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false, false);
+    SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false);
     try {
       ByteBuffer response = 
ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"),
         TimeUnit.SECONDS.toMillis(10));
@@ -279,7 +278,7 @@ public class SparkSaslSuite {
       new Random().nextBytes(data);
       Files.write(data, file);
 
-      ctx = new SaslTestCtx(rpcHandler, true, false, false, testConf);
+      ctx = new SaslTestCtx(rpcHandler, true, false, testConf);
 
       final CountDownLatch lock = new CountDownLatch(1);
 
@@ -317,7 +316,7 @@ public class SparkSaslSuite {
   public void testServerAlwaysEncrypt() throws Exception {
     SaslTestCtx ctx = null;
     try {
-      ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, false,
+      ctx = new SaslTestCtx(mock(RpcHandler.class), false, false,
         ImmutableMap.of("spark.network.sasl.serverAlwaysEncrypt", "true"));
       fail("Should have failed to connect without encryption.");
     } catch (Exception e) {
@@ -336,7 +335,7 @@ public class SparkSaslSuite {
     // able to understand RPCs sent to it and thus close the connection.
     SaslTestCtx ctx = null;
     try {
-      ctx = new SaslTestCtx(mock(RpcHandler.class), true, true, false);
+      ctx = new SaslTestCtx(mock(RpcHandler.class), true, true);
       ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"),
         TimeUnit.SECONDS.toMillis(10));
       fail("Should have failed to send RPC to server.");
@@ -374,69 +373,6 @@ public class SparkSaslSuite {
     }
   }
 
-  @Test
-  public void testAesEncryption() throws Exception {
-    final AtomicReference<ManagedBuffer> response = new AtomicReference<>();
-    final File file = File.createTempFile("sasltest", ".txt");
-    SaslTestCtx ctx = null;
-    try {
-      final TransportConf conf = new TransportConf("rpc", 
MapConfigProvider.EMPTY);
-      final TransportConf spyConf = spy(conf);
-      doReturn(true).when(spyConf).aesEncryptionEnabled();
-
-      StreamManager sm = mock(StreamManager.class);
-      when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new 
Answer<ManagedBuffer>() {
-        @Override
-        public ManagedBuffer answer(InvocationOnMock invocation) {
-          return new FileSegmentManagedBuffer(spyConf, file, 0, file.length());
-        }
-      });
-
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      when(rpcHandler.getStreamManager()).thenReturn(sm);
-
-      byte[] data = new byte[256 * 1024 * 1024];
-      new Random().nextBytes(data);
-      Files.write(data, file);
-
-      ctx = new SaslTestCtx(rpcHandler, true, false, true);
-
-      final Object lock = new Object();
-
-      ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-      doAnswer(new Answer<Void>() {
-        @Override
-        public Void answer(InvocationOnMock invocation) {
-          response.set((ManagedBuffer) invocation.getArguments()[1]);
-          response.get().retain();
-          synchronized (lock) {
-            lock.notifyAll();
-          }
-          return null;
-        }
-      }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));
-
-      synchronized (lock) {
-        ctx.client.fetchChunk(0, 0, callback);
-        lock.wait(10 * 1000);
-      }
-
-      verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
-      verify(callback, never()).onFailure(anyInt(), any(Throwable.class));
-
-      byte[] received = 
ByteStreams.toByteArray(response.get().createInputStream());
-      assertTrue(Arrays.equals(data, received));
-    } finally {
-      file.delete();
-      if (ctx != null) {
-        ctx.close();
-      }
-      if (response.get() != null) {
-        response.get().release();
-      }
-    }
-  }
-
   private static class SaslTestCtx {
 
     final TransportClient client;
@@ -449,46 +385,39 @@ public class SparkSaslSuite {
     SaslTestCtx(
         RpcHandler rpcHandler,
         boolean encrypt,
-        boolean disableClientEncryption,
-        boolean aesEnable)
+        boolean disableClientEncryption)
       throws Exception {
 
-      this(rpcHandler, encrypt, disableClientEncryption, aesEnable,
-        Collections.<String, String>emptyMap());
+      this(rpcHandler, encrypt, disableClientEncryption, Collections.<String, 
String>emptyMap());
     }
 
     SaslTestCtx(
         RpcHandler rpcHandler,
         boolean encrypt,
         boolean disableClientEncryption,
-        boolean aesEnable,
-        Map<String, String> testConf)
+        Map<String, String> extraConf)
       throws Exception {
 
+      Map<String, String> testConf = ImmutableMap.<String, String>builder()
+        .putAll(extraConf)
+        .put("spark.authenticate.enableSaslEncryption", 
String.valueOf(encrypt))
+        .build();
       TransportConf conf = new TransportConf("shuffle", new 
MapConfigProvider(testConf));
 
-      if (aesEnable) {
-        conf = spy(conf);
-        doReturn(true).when(conf).aesEncryptionEnabled();
-      }
-
       SecretKeyHolder keyHolder = mock(SecretKeyHolder.class);
       when(keyHolder.getSaslUser(anyString())).thenReturn("user");
       when(keyHolder.getSecretKey(anyString())).thenReturn("secret");
 
       TransportContext ctx = new TransportContext(conf, rpcHandler);
 
-      String encryptHandlerName = aesEnable ? 
AesCipher.ENCRYPTION_HANDLER_NAME :
-        SaslEncryption.ENCRYPTION_HANDLER_NAME;
-
-      this.checker = new EncryptionCheckerBootstrap(encryptHandlerName);
+      this.checker = new 
EncryptionCheckerBootstrap(SaslEncryption.ENCRYPTION_HANDLER_NAME);
 
       this.server = ctx.createServer(Arrays.asList(new 
SaslServerBootstrap(conf, keyHolder),
         checker));
 
       try {
         List<TransportClientBootstrap> clientBootstraps = Lists.newArrayList();
-        clientBootstraps.add(new SaslClientBootstrap(conf, "user", keyHolder, 
encrypt));
+        clientBootstraps.add(new SaslClientBootstrap(conf, "user", keyHolder));
         if (disableClientEncryption) {
           clientBootstraps.add(new EncryptionDisablerBootstrap());
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 772fb88..616505d 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +29,7 @@ import org.apache.spark.network.TransportContext;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientBootstrap;
 import org.apache.spark.network.client.TransportClientFactory;
-import org.apache.spark.network.sasl.SaslClientBootstrap;
+import org.apache.spark.network.crypto.AuthClientBootstrap;
 import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.server.NoOpRpcHandler;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
@@ -47,8 +46,7 @@ public class ExternalShuffleClient extends ShuffleClient {
   private static final Logger logger = 
LoggerFactory.getLogger(ExternalShuffleClient.class);
 
   private final TransportConf conf;
-  private final boolean saslEnabled;
-  private final boolean saslEncryptionEnabled;
+  private final boolean authEnabled;
   private final SecretKeyHolder secretKeyHolder;
 
   protected TransportClientFactory clientFactory;
@@ -61,15 +59,10 @@ public class ExternalShuffleClient extends ShuffleClient {
   public ExternalShuffleClient(
       TransportConf conf,
       SecretKeyHolder secretKeyHolder,
-      boolean saslEnabled,
-      boolean saslEncryptionEnabled) {
-    Preconditions.checkArgument(
-      !saslEncryptionEnabled || saslEnabled,
-      "SASL encryption can only be enabled if SASL is also enabled.");
+      boolean authEnabled) {
     this.conf = conf;
     this.secretKeyHolder = secretKeyHolder;
-    this.saslEnabled = saslEnabled;
-    this.saslEncryptionEnabled = saslEncryptionEnabled;
+    this.authEnabled = authEnabled;
   }
 
   protected void checkInit() {
@@ -81,8 +74,8 @@ public class ExternalShuffleClient extends ShuffleClient {
     this.appId = appId;
     TransportContext context = new TransportContext(conf, new 
NoOpRpcHandler(), true);
     List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
-    if (saslEnabled) {
-      bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, 
saslEncryptionEnabled));
+    if (authEnabled) {
+      bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder));
     }
     clientFactory = context.createClientFactory(bootstraps);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index 42cedd9..ab49b1c 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -60,9 +60,8 @@ public class MesosExternalShuffleClient extends 
ExternalShuffleClient {
   public MesosExternalShuffleClient(
       TransportConf conf,
       SecretKeyHolder secretKeyHolder,
-      boolean saslEnabled,
-      boolean saslEncryptionEnabled) {
-    super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
+      boolean authEnabled) {
+    super(conf, secretKeyHolder, authEnabled);
   }
 
   public void registerDriverWithShuffleService(

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 8dd97b2..9248ef3 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -133,7 +133,7 @@ public class ExternalShuffleIntegrationSuite {
 
     final Semaphore requestsRemaining = new Semaphore(0);
 
-    ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, 
false, false);
+    ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, 
false);
     client.init(APP_ID);
     client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
       new BlockFetchingListener() {
@@ -243,7 +243,7 @@ public class ExternalShuffleIntegrationSuite {
 
   private void registerExecutor(String executorId, ExecutorShuffleInfo 
executorInfo)
       throws IOException {
-    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, 
false, false);
+    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, 
false);
     client.init(APP_ID);
     client.registerWithShuffleServer(TestUtils.getLocalHost(), 
server.getPort(),
       executorId, executorInfo);

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index aed25a1..4ae75a1 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.shuffle;
 import java.io.IOException;
 import java.util.Arrays;
 
+import com.google.common.collect.ImmutableMap;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -88,8 +89,14 @@ public class ExternalShuffleSecuritySuite {
 
   /** Creates an ExternalShuffleClient and attempts to register with the 
server. */
   private void validate(String appId, String secretKey, boolean encrypt) 
throws IOException {
+    TransportConf testConf = conf;
+    if (encrypt) {
+      testConf = new TransportConf("shuffle", new MapConfigProvider(
+        ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
+    }
+
     ExternalShuffleClient client =
-      new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, 
secretKey), true, encrypt);
+      new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, 
secretKey), true);
     client.init(appId);
     // Registration either succeeds or throws an exception.
     client.registerWithShuffleServer(TestUtils.getLocalHost(), 
server.getPort(), "exec0",

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index ea726e3..c7620d0 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -45,7 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.sasl.SaslServerBootstrap;
+import org.apache.spark.network.crypto.AuthServerBootstrap;
 import org.apache.spark.network.sasl.ShuffleSecretManager;
 import org.apache.spark.network.server.TransportServer;
 import org.apache.spark.network.server.TransportServerBootstrap;
@@ -172,7 +172,7 @@ public class YarnShuffleService extends AuxiliaryService {
       boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
       if (authEnabled) {
         createSecretManager();
-        bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
+        bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
       }
 
       int port = conf.getInt(

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 9bdc509..cde7682 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.Text
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.network.sasl.SecretKeyHolder
 import org.apache.spark.util.Utils
 
@@ -191,7 +192,7 @@ private[spark] class SecurityManager(
   // allow all users/groups to have view/modify permissions
   private val WILDCARD_ACL = "*"
 
-  private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, 
false)
+  private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
   // keep spark.ui.acls.enable for backwards compatibility with 1.0
   private var aclsOn =
     sparkConf.getBoolean("spark.acls.enable", 
sparkConf.getBoolean("spark.ui.acls.enable", false))
@@ -516,11 +517,11 @@ private[spark] class SecurityManager(
   def isAuthenticationEnabled(): Boolean = authOn
 
   /**
-   * Checks whether SASL encryption should be enabled.
-   * @return Whether to enable SASL encryption when connecting to services 
that support it.
+   * Checks whether network encryption should be enabled.
+   * @return Whether to enable encryption when connecting to services that 
support it.
    */
-  def isSaslEncryptionEnabled(): Boolean = {
-    sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false)
+  def isEncryptionEnabled(): Boolean = {
+    sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || 
sparkConf.get(SASL_ENCRYPTION_ENABLED)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 601d241..308a1ed 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -607,6 +607,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
           "\"client\".")
       }
     }
+
+    val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || 
get(SASL_ENCRYPTION_ENABLED)
+    require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
+      s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
   }
 
   /**
@@ -726,6 +730,7 @@ private[spark] object SparkConf extends Logging {
     (name.startsWith("spark.auth") && name != 
SecurityManager.SPARK_AUTH_SECRET_CONF) ||
     name.startsWith("spark.ssl") ||
     name.startsWith("spark.rpc") ||
+    name.startsWith("spark.network") ||
     isSparkPortConf(name)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1296386..539dbb5 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -235,7 +235,7 @@ object SparkEnv extends Logging {
 
     val securityManager = new SecurityManager(conf, ioEncryptionKey)
     ioEncryptionKey.foreach { _ =>
-      if (!securityManager.isSaslEncryptionEnabled()) {
+      if (!securityManager.isEncryptionEnabled()) {
         logWarning("I/O encryption enabled without RPC encryption: keys will 
be visible on the " +
           "wire.")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 13eadbe..8d491dd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -25,8 +25,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.TransportContext
+import org.apache.spark.network.crypto.AuthServerBootstrap
 import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.SaslServerBootstrap
 import org.apache.spark.network.server.{TransportServer, 
TransportServerBootstrap}
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
 import org.apache.spark.network.util.TransportConf
@@ -47,7 +47,6 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
 
   private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", 
false)
   private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
-  private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
   private val transportConf =
     SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
@@ -74,10 +73,11 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
   /** Start the external shuffle service */
   def start() {
     require(server == null, "Shuffle server already started")
-    logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+    val authEnabled = securityManager.isAuthenticationEnabled()
+    logInfo(s"Starting shuffle service on port $port (auth enabled = 
$authEnabled)")
     val bootstraps: Seq[TransportServerBootstrap] =
-      if (useSasl) {
-        Seq(new SaslServerBootstrap(transportConf, securityManager))
+      if (authEnabled) {
+        Seq(new AuthServerBootstrap(transportConf, securityManager))
       } else {
         Nil
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index aba429b..536f493 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -243,4 +243,20 @@ package object config {
         "and event logs.")
       .stringConf
       .createWithDefault("(?i)secret|password")
+
+  private[spark] val NETWORK_AUTH_ENABLED =
+    ConfigBuilder("spark.authenticate")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val SASL_ENCRYPTION_ENABLED =
+    ConfigBuilder("spark.authenticate.enableSaslEncryption")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val NETWORK_ENCRYPTION_ENABLED =
+    ConfigBuilder("spark.network.crypto.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 3d4ea3c..b75e91b 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClientBootstrap, TransportClientFactory}
-import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
+import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
 import org.apache.spark.network.server._
 import org.apache.spark.network.shuffle.{BlockFetchingListener, 
OneForOneBlockFetcher, RetryingBlockFetcher}
 import org.apache.spark.network.shuffle.protocol.UploadBlock
@@ -63,9 +63,8 @@ private[spark] class NettyBlockTransferService(
     var serverBootstrap: Option[TransportServerBootstrap] = None
     var clientBootstrap: Option[TransportClientBootstrap] = None
     if (authEnabled) {
-      serverBootstrap = Some(new SaslServerBootstrap(transportConf, 
securityManager))
-      clientBootstrap = Some(new SaslClientBootstrap(transportConf, 
conf.getAppId, securityManager,
-        securityManager.isSaslEncryptionEnabled()))
+      serverBootstrap = Some(new AuthServerBootstrap(transportConf, 
securityManager))
+      clientBootstrap = Some(new AuthClientBootstrap(transportConf, 
conf.getAppId, securityManager))
     }
     transportContext = new TransportContext(transportConf, rpcHandler)
     clientFactory = 
transportContext.createClientFactory(clientBootstrap.toSeq.asJava)

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e56943d..1e448b2 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -33,8 +33,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.client._
+import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
 import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap}
 import org.apache.spark.network.server._
 import org.apache.spark.rpc._
 import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance}
@@ -60,8 +60,8 @@ private[netty] class NettyRpcEnv(
 
   private def createClientBootstraps(): 
java.util.List[TransportClientBootstrap] = {
     if (securityManager.isAuthenticationEnabled()) {
-      java.util.Arrays.asList(new SaslClientBootstrap(transportConf, "", 
securityManager,
-        securityManager.isSaslEncryptionEnabled()))
+      java.util.Arrays.asList(new AuthClientBootstrap(transportConf,
+        securityManager.getSaslUser(), securityManager))
     } else {
       java.util.Collections.emptyList[TransportClientBootstrap]
     }
@@ -111,7 +111,7 @@ private[netty] class NettyRpcEnv(
   def startServer(bindAddress: String, port: Int): Unit = {
     val bootstraps: java.util.List[TransportServerBootstrap] =
       if (securityManager.isAuthenticationEnabled()) {
-        java.util.Arrays.asList(new SaslServerBootstrap(transportConf, 
securityManager))
+        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, 
securityManager))
       } else {
         java.util.Collections.emptyList()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 04521c9..c401867 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -125,8 +125,7 @@ private[spark] class BlockManager(
   // standard BlockTransferService to directly connect to other Executors.
   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
     val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores)
-    new ExternalShuffleClient(transConf, securityManager, 
securityManager.isAuthenticationEnabled(),
-      securityManager.isSaslEncryptionEnabled())
+    new ExternalShuffleClient(transConf, securityManager, 
securityManager.isAuthenticationEnabled())
   } else {
     blockTransferService
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 83906cf..0897891 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -303,6 +303,25 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     }
   }
 
+  test("encryption requires authentication") {
+    val conf = new SparkConf()
+    conf.validateSettings()
+
+    conf.set(NETWORK_ENCRYPTION_ENABLED, true)
+    intercept[IllegalArgumentException] {
+      conf.validateSettings()
+    }
+
+    conf.set(NETWORK_ENCRYPTION_ENABLED, false)
+    conf.set(SASL_ENCRYPTION_ENABLED, true)
+    intercept[IllegalArgumentException] {
+      conf.validateSettings()
+    }
+
+    conf.set(NETWORK_AUTH_ENABLED, true)
+    conf.validateSettings()
+  }
+
 }
 
 class Class1 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 022fe91..fe89558 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -94,6 +94,20 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite 
with MockitoSugar wi
     }
   }
 
+  test("security with aes encryption") {
+    val conf = new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good")
+      .set("spark.app.id", "app-id")
+      .set("spark.network.crypto.enabled", "true")
+      .set("spark.network.crypto.saslFallback", "false")
+    testConnection(conf, conf) match {
+      case Success(_) => // expected
+      case Failure(t) => fail(t)
+    }
+  }
+
+
   /**
    * Creates two servers with different configurations and sees if they can 
talk.
    * Returns Success() if they can transfer a block, and Failure() if the 
block transfer was failed

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index acdf21d..b4037d7 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -637,11 +637,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     assert(anotherEnv.address.port != env.address.port)
   }
 
-  test("send with authentication") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "true")
-    conf.set("spark.authenticate.secret", "good")
-
+  private def testSend(conf: SparkConf): Unit = {
     val localEnv = createRpcEnv(conf, "authentication-local", 0)
     val remoteEnv = createRpcEnv(conf, "authentication-remote", 0, clientMode 
= true)
 
@@ -667,11 +663,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     }
   }
 
-  test("ask with authentication") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "true")
-    conf.set("spark.authenticate.secret", "good")
-
+  private def testAsk(conf: SparkConf): Unit = {
     val localEnv = createRpcEnv(conf, "authentication-local", 0)
     val remoteEnv = createRpcEnv(conf, "authentication-remote", 0, clientMode 
= true)
 
@@ -695,6 +687,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     }
   }
 
+  test("send with authentication") {
+    testSend(new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good"))
+  }
+
+  test("send with SASL encryption") {
+    testSend(new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good")
+      .set("spark.authenticate.enableSaslEncryption", "true"))
+  }
+
+  test("send with AES encryption") {
+    testSend(new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good")
+      .set("spark.network.crypto.enabled", "true")
+      .set("spark.network.crypto.saslFallback", "false"))
+  }
+
+  test("ask with authentication") {
+    testAsk(new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good"))
+  }
+
+  test("ask with SASL encryption") {
+    testAsk(new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good")
+      .set("spark.authenticate.enableSaslEncryption", "true"))
+  }
+
+  test("ask with AES encryption") {
+    testAsk(new SparkConf()
+      .set("spark.authenticate", "true")
+      .set("spark.authenticate.secret", "good")
+      .set("spark.network.crypto.enabled", "true")
+      .set("spark.network.crypto.saslFallback", "false"))
+  }
+
   test("construct RpcTimeout with conf property") {
     val conf = new SparkConf
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b7f10e6..7c04033 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1639,40 +1639,40 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.authenticate.enableSaslEncryption</code></td>
+  <td><code>spark.network.crypto.enabled</code></td>
   <td>false</td>
   <td>
-    Enable encrypted communication when authentication is
-    enabled. This is supported by the block transfer service and the
-    RPC endpoints.
+    Enable encryption using the commons-crypto library for RPC and block 
transfer service.
+    Requires <code>spark.authenticate</code> to be enabled.
   </td>
 </tr>
 <tr>
-  <td><code>spark.network.sasl.serverAlwaysEncrypt</code></td>
-  <td>false</td>
+  <td><code>spark.network.crypto.keyLength</code></td>
+  <td>128</td>
   <td>
-    Disable unencrypted connections for services that support SASL 
authentication. This is
-    currently supported by the external shuffle service.
+    The length in bits of the encryption key to generate. Valid values are 
128, 192 and 256.
   </td>
 </tr>
 <tr>
-  <td><code>spark.network.aes.enabled</code></td>
-  <td>false</td>
+  <td><code>spark.network.crypto.keyFactoryAlgorithm</code></td>
+  <td>PBKDF2WithHmacSHA1</td>
   <td>
-    Enable AES for over-the-wire encryption. This is supported for RPC and the 
block transfer service.
-    This option has precedence over SASL-based encryption if both are enabled.
+    The key factory algorithm to use when generating encryption keys. Should 
be one of the
+    algorithms supported by the javax.crypto.SecretKeyFactory class in the JRE 
being used.
   </td>
 </tr>
 <tr>
-  <td><code>spark.network.aes.keySize</code></td>
-  <td>16</td>
+  <td><code>spark.network.crypto.saslFallback</code></td>
+  <td>true</td>
   <td>
-    The bytes of AES cipher key which is effective when AES cipher is enabled. 
AES
-    works with 16, 24 and 32 bytes keys.
+    Whether to fall back to SASL authentication if authentication fails using 
Spark's internal
+    mechanism. This is useful when the application is connecting to old 
shuffle services that
+    do not support the internal Spark authentication protocol. On the server 
side, this can be
+    used to block older clients from authenticating against a new shuffle 
service.
   </td>
 </tr>
 <tr>
-  <td><code>spark.network.aes.config.*</code></td>
+  <td><code>spark.network.crypto.config.*</code></td>
   <td>None</td>
   <td>
     Configuration values for the commons-crypto library, such as which cipher 
implementations to
@@ -1681,6 +1681,22 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.authenticate.enableSaslEncryption</code></td>
+  <td>false</td>
+  <td>
+    Enable encrypted communication when authentication is
+    enabled. This is supported by the block transfer service and the
+    RPC endpoints.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.network.sasl.serverAlwaysEncrypt</code></td>
+  <td>false</td>
+  <td>
+    Disable unencrypted connections for services that support SASL 
authentication.
+  </td>
+</tr>
+<tr>
   <td><code>spark.core.connection.ack.wait.timeout</code></td>
   <td><code>spark.network.timeout</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 3258b09..f555072 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -136,8 +136,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     new MesosExternalShuffleClient(
       SparkTransportConf.fromSparkConf(conf, "shuffle"),
       securityManager,
-      securityManager.isAuthenticationEnabled(),
-      securityManager.isSaslEncryptionEnabled())
+      securityManager.isAuthenticationEnabled())
   }
 
   var nextMesosTaskId = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to