Repository: spark Updated Branches: refs/heads/master d9783380f -> 8f3f73abc
http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java new file mode 100644 index 0000000..21609d5 --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.crypto; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.netty.channel.Channel; +import org.junit.After; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.apache.spark.network.TestUtils; +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.client.TransportClientBootstrap; +import org.apache.spark.network.sasl.SaslRpcHandler; +import org.apache.spark.network.sasl.SaslServerBootstrap; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.server.StreamManager; +import org.apache.spark.network.server.TransportServer; +import org.apache.spark.network.server.TransportServerBootstrap; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.MapConfigProvider; +import org.apache.spark.network.util.TransportConf; + +public class AuthIntegrationSuite { + + private AuthTestCtx ctx; + + @After + public void cleanUp() throws Exception { + if (ctx != null) { + ctx.close(); + } + ctx = null; + } + + @Test + public void testNewAuth() throws Exception { + ctx = new AuthTestCtx(); + ctx.createServer("secret"); + ctx.createClient("secret"); + + ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); + assertEquals("Pong", JavaUtils.bytesToString(reply)); + assertTrue(ctx.authRpcHandler.doDelegate); + assertFalse(ctx.authRpcHandler.delegate instanceof SaslRpcHandler); + } + + @Test + public void testAuthFailure() throws Exception { + ctx = new AuthTestCtx(); + ctx.createServer("server"); + + try { + ctx.createClient("client"); + fail("Should have failed to create client."); + } catch (Exception e) { + assertFalse(ctx.authRpcHandler.doDelegate); + assertFalse(ctx.serverChannel.isActive()); + } + } + + @Test + public void testSaslServerFallback() throws Exception { + ctx = new AuthTestCtx(); + ctx.createServer("secret", true); + ctx.createClient("secret", false); + + ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); + assertEquals("Pong", JavaUtils.bytesToString(reply)); + } + + @Test + public void testSaslClientFallback() throws Exception { + ctx = new AuthTestCtx(); + ctx.createServer("secret", false); + ctx.createClient("secret", true); + + ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); + assertEquals("Pong", JavaUtils.bytesToString(reply)); + } + + @Test + public void testAuthReplay() throws Exception { + // This test covers the case where an attacker replays a challenge message sniffed from the + // network, but doesn't know the actual secret. The server should close the connection as + // soon as a message is sent after authentication is performed. This is emulated by removing + // the client encryption handler after authentication. + ctx = new AuthTestCtx(); + ctx.createServer("secret"); + ctx.createClient("secret"); + + assertNotNull(ctx.client.getChannel().pipeline() + .remove(TransportCipher.ENCRYPTION_HANDLER_NAME)); + + try { + ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); + fail("Should have failed unencrypted RPC."); + } catch (Exception e) { + assertTrue(ctx.authRpcHandler.doDelegate); + } + } + + private class AuthTestCtx { + + private final String appId = "testAppId"; + private final TransportConf conf; + private final TransportContext ctx; + + TransportClient client; + TransportServer server; + volatile Channel serverChannel; + volatile AuthRpcHandler authRpcHandler; + + AuthTestCtx() throws Exception { + Map<String, String> testConf = ImmutableMap.of("spark.network.crypto.enabled", "true"); + this.conf = new TransportConf("rpc", new MapConfigProvider(testConf)); + + RpcHandler rpcHandler = new RpcHandler() { + @Override + public void receive( + TransportClient client, + ByteBuffer message, + RpcResponseCallback callback) { + assertEquals("Ping", JavaUtils.bytesToString(message)); + callback.onSuccess(JavaUtils.stringToBytes("Pong")); + } + + @Override + public StreamManager getStreamManager() { + return null; + } + }; + + this.ctx = new TransportContext(conf, rpcHandler); + } + + void createServer(String secret) throws Exception { + createServer(secret, true); + } + + void createServer(String secret, boolean enableAes) throws Exception { + TransportServerBootstrap introspector = new TransportServerBootstrap() { + @Override + public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { + AuthTestCtx.this.serverChannel = channel; + if (rpcHandler instanceof AuthRpcHandler) { + AuthTestCtx.this.authRpcHandler = (AuthRpcHandler) rpcHandler; + } + return rpcHandler; + } + }; + SecretKeyHolder keyHolder = createKeyHolder(secret); + TransportServerBootstrap auth = enableAes ? new AuthServerBootstrap(conf, keyHolder) + : new SaslServerBootstrap(conf, keyHolder); + this.server = ctx.createServer(Lists.newArrayList(auth, introspector)); + } + + void createClient(String secret) throws Exception { + createClient(secret, true); + } + + void createClient(String secret, boolean enableAes) throws Exception { + TransportConf clientConf = enableAes ? conf + : new TransportConf("rpc", MapConfigProvider.EMPTY); + List<TransportClientBootstrap> bootstraps = Lists.<TransportClientBootstrap>newArrayList( + new AuthClientBootstrap(clientConf, appId, createKeyHolder(secret))); + this.client = ctx.createClientFactory(bootstraps) + .createClient(TestUtils.getLocalHost(), server.getPort()); + } + + void close() { + if (client != null) { + client.close(); + } + if (server != null) { + server.close(); + } + } + + private SecretKeyHolder createKeyHolder(String secret) { + SecretKeyHolder keyHolder = mock(SecretKeyHolder.class); + when(keyHolder.getSaslUser(anyString())).thenReturn(appId); + when(keyHolder.getSecretKey(anyString())).thenReturn(secret); + return keyHolder; + } + + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java new file mode 100644 index 0000000..a90ff24 --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.crypto; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.Test; +import static org.junit.Assert.*; + +import org.apache.spark.network.protocol.Encodable; + +public class AuthMessagesSuite { + + private static int COUNTER = 0; + + private static String string() { + return String.valueOf(COUNTER++); + } + + private static byte[] byteArray() { + byte[] bytes = new byte[COUNTER++]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) COUNTER; + } return bytes; + } + + private static int integer() { + return COUNTER++; + } + + @Test + public void testClientChallenge() { + ClientChallenge msg = new ClientChallenge(string(), string(), integer(), string(), integer(), + byteArray(), byteArray()); + ClientChallenge decoded = ClientChallenge.decodeMessage(encode(msg)); + + assertEquals(msg.appId, decoded.appId); + assertEquals(msg.kdf, decoded.kdf); + assertEquals(msg.iterations, decoded.iterations); + assertEquals(msg.cipher, decoded.cipher); + assertEquals(msg.keyLength, decoded.keyLength); + assertTrue(Arrays.equals(msg.nonce, decoded.nonce)); + assertTrue(Arrays.equals(msg.challenge, decoded.challenge)); + } + + @Test + public void testServerResponse() { + ServerResponse msg = new ServerResponse(byteArray(), byteArray(), byteArray(), byteArray()); + ServerResponse decoded = ServerResponse.decodeMessage(encode(msg)); + assertTrue(Arrays.equals(msg.response, decoded.response)); + assertTrue(Arrays.equals(msg.nonce, decoded.nonce)); + assertTrue(Arrays.equals(msg.inputIv, decoded.inputIv)); + assertTrue(Arrays.equals(msg.outputIv, decoded.outputIv)); + } + + private ByteBuffer encode(Encodable msg) { + ByteBuf buf = Unpooled.buffer(); + msg.encode(buf); + return buf.nioBuffer(); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index e27301f..87129b9 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -56,7 +56,6 @@ import org.apache.spark.network.client.ChunkReceivedCallback; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientBootstrap; -import org.apache.spark.network.sasl.aes.AesCipher; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.server.TransportServer; @@ -153,7 +152,7 @@ public class SparkSaslSuite { .when(rpcHandler) .receive(any(TransportClient.class), any(ByteBuffer.class), any(RpcResponseCallback.class)); - SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false, false); + SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false); try { ByteBuffer response = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), TimeUnit.SECONDS.toMillis(10)); @@ -279,7 +278,7 @@ public class SparkSaslSuite { new Random().nextBytes(data); Files.write(data, file); - ctx = new SaslTestCtx(rpcHandler, true, false, false, testConf); + ctx = new SaslTestCtx(rpcHandler, true, false, testConf); final CountDownLatch lock = new CountDownLatch(1); @@ -317,7 +316,7 @@ public class SparkSaslSuite { public void testServerAlwaysEncrypt() throws Exception { SaslTestCtx ctx = null; try { - ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, false, + ctx = new SaslTestCtx(mock(RpcHandler.class), false, false, ImmutableMap.of("spark.network.sasl.serverAlwaysEncrypt", "true")); fail("Should have failed to connect without encryption."); } catch (Exception e) { @@ -336,7 +335,7 @@ public class SparkSaslSuite { // able to understand RPCs sent to it and thus close the connection. SaslTestCtx ctx = null; try { - ctx = new SaslTestCtx(mock(RpcHandler.class), true, true, false); + ctx = new SaslTestCtx(mock(RpcHandler.class), true, true); ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), TimeUnit.SECONDS.toMillis(10)); fail("Should have failed to send RPC to server."); @@ -374,69 +373,6 @@ public class SparkSaslSuite { } } - @Test - public void testAesEncryption() throws Exception { - final AtomicReference<ManagedBuffer> response = new AtomicReference<>(); - final File file = File.createTempFile("sasltest", ".txt"); - SaslTestCtx ctx = null; - try { - final TransportConf conf = new TransportConf("rpc", MapConfigProvider.EMPTY); - final TransportConf spyConf = spy(conf); - doReturn(true).when(spyConf).aesEncryptionEnabled(); - - StreamManager sm = mock(StreamManager.class); - when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>() { - @Override - public ManagedBuffer answer(InvocationOnMock invocation) { - return new FileSegmentManagedBuffer(spyConf, file, 0, file.length()); - } - }); - - RpcHandler rpcHandler = mock(RpcHandler.class); - when(rpcHandler.getStreamManager()).thenReturn(sm); - - byte[] data = new byte[256 * 1024 * 1024]; - new Random().nextBytes(data); - Files.write(data, file); - - ctx = new SaslTestCtx(rpcHandler, true, false, true); - - final Object lock = new Object(); - - ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - response.set((ManagedBuffer) invocation.getArguments()[1]); - response.get().retain(); - synchronized (lock) { - lock.notifyAll(); - } - return null; - } - }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class)); - - synchronized (lock) { - ctx.client.fetchChunk(0, 0, callback); - lock.wait(10 * 1000); - } - - verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class)); - verify(callback, never()).onFailure(anyInt(), any(Throwable.class)); - - byte[] received = ByteStreams.toByteArray(response.get().createInputStream()); - assertTrue(Arrays.equals(data, received)); - } finally { - file.delete(); - if (ctx != null) { - ctx.close(); - } - if (response.get() != null) { - response.get().release(); - } - } - } - private static class SaslTestCtx { final TransportClient client; @@ -449,46 +385,39 @@ public class SparkSaslSuite { SaslTestCtx( RpcHandler rpcHandler, boolean encrypt, - boolean disableClientEncryption, - boolean aesEnable) + boolean disableClientEncryption) throws Exception { - this(rpcHandler, encrypt, disableClientEncryption, aesEnable, - Collections.<String, String>emptyMap()); + this(rpcHandler, encrypt, disableClientEncryption, Collections.<String, String>emptyMap()); } SaslTestCtx( RpcHandler rpcHandler, boolean encrypt, boolean disableClientEncryption, - boolean aesEnable, - Map<String, String> testConf) + Map<String, String> extraConf) throws Exception { + Map<String, String> testConf = ImmutableMap.<String, String>builder() + .putAll(extraConf) + .put("spark.authenticate.enableSaslEncryption", String.valueOf(encrypt)) + .build(); TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf)); - if (aesEnable) { - conf = spy(conf); - doReturn(true).when(conf).aesEncryptionEnabled(); - } - SecretKeyHolder keyHolder = mock(SecretKeyHolder.class); when(keyHolder.getSaslUser(anyString())).thenReturn("user"); when(keyHolder.getSecretKey(anyString())).thenReturn("secret"); TransportContext ctx = new TransportContext(conf, rpcHandler); - String encryptHandlerName = aesEnable ? AesCipher.ENCRYPTION_HANDLER_NAME : - SaslEncryption.ENCRYPTION_HANDLER_NAME; - - this.checker = new EncryptionCheckerBootstrap(encryptHandlerName); + this.checker = new EncryptionCheckerBootstrap(SaslEncryption.ENCRYPTION_HANDLER_NAME); this.server = ctx.createServer(Arrays.asList(new SaslServerBootstrap(conf, keyHolder), checker)); try { List<TransportClientBootstrap> clientBootstraps = Lists.newArrayList(); - clientBootstraps.add(new SaslClientBootstrap(conf, "user", keyHolder, encrypt)); + clientBootstraps.add(new SaslClientBootstrap(conf, "user", keyHolder)); if (disableClientEncryption) { clientBootstraps.add(new EncryptionDisablerBootstrap()); } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 772fb88..616505d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +29,7 @@ import org.apache.spark.network.TransportContext; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientBootstrap; import org.apache.spark.network.client.TransportClientFactory; -import org.apache.spark.network.sasl.SaslClientBootstrap; +import org.apache.spark.network.crypto.AuthClientBootstrap; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.server.NoOpRpcHandler; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; @@ -47,8 +46,7 @@ public class ExternalShuffleClient extends ShuffleClient { private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class); private final TransportConf conf; - private final boolean saslEnabled; - private final boolean saslEncryptionEnabled; + private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; protected TransportClientFactory clientFactory; @@ -61,15 +59,10 @@ public class ExternalShuffleClient extends ShuffleClient { public ExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, - boolean saslEnabled, - boolean saslEncryptionEnabled) { - Preconditions.checkArgument( - !saslEncryptionEnabled || saslEnabled, - "SASL encryption can only be enabled if SASL is also enabled."); + boolean authEnabled) { this.conf = conf; this.secretKeyHolder = secretKeyHolder; - this.saslEnabled = saslEnabled; - this.saslEncryptionEnabled = saslEncryptionEnabled; + this.authEnabled = authEnabled; } protected void checkInit() { @@ -81,8 +74,8 @@ public class ExternalShuffleClient extends ShuffleClient { this.appId = appId; TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true); List<TransportClientBootstrap> bootstraps = Lists.newArrayList(); - if (saslEnabled) { - bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, saslEncryptionEnabled)); + if (authEnabled) { + bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder)); } clientFactory = context.createClientFactory(bootstraps); } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 42cedd9..ab49b1c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -60,9 +60,8 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { public MesosExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, - boolean saslEnabled, - boolean saslEncryptionEnabled) { - super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); + boolean authEnabled) { + super(conf, secretKeyHolder, authEnabled); } public void registerDriverWithShuffleService( http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 8dd97b2..9248ef3 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -133,7 +133,7 @@ public class ExternalShuffleIntegrationSuite { final Semaphore requestsRemaining = new Semaphore(0); - ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, false); + ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false); client.init(APP_ID); client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, new BlockFetchingListener() { @@ -243,7 +243,7 @@ public class ExternalShuffleIntegrationSuite { private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) throws IOException { - ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false); + ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false); client.init(APP_ID); client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), executorId, executorInfo); http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index aed25a1..4ae75a1 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -20,6 +20,7 @@ package org.apache.spark.network.shuffle; import java.io.IOException; import java.util.Arrays; +import com.google.common.collect.ImmutableMap; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -88,8 +89,14 @@ public class ExternalShuffleSecuritySuite { /** Creates an ExternalShuffleClient and attempts to register with the server. */ private void validate(String appId, String secretKey, boolean encrypt) throws IOException { + TransportConf testConf = conf; + if (encrypt) { + testConf = new TransportConf("shuffle", new MapConfigProvider( + ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true"))); + } + ExternalShuffleClient client = - new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt); + new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true); client.init(appId); // Registration either succeeds or throws an exception. client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java ---------------------------------------------------------------------- diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index ea726e3..c7620d0 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -45,7 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.TransportContext; -import org.apache.spark.network.sasl.SaslServerBootstrap; +import org.apache.spark.network.crypto.AuthServerBootstrap; import org.apache.spark.network.sasl.ShuffleSecretManager; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; @@ -172,7 +172,7 @@ public class YarnShuffleService extends AuxiliaryService { boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { createSecretManager(); - bootstraps.add(new SaslServerBootstrap(transportConf, secretManager)); + bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } int port = conf.getInt( http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/SecurityManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 9bdc509..cde7682 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.network.sasl.SecretKeyHolder import org.apache.spark.util.Utils @@ -191,7 +192,7 @@ private[spark] class SecurityManager( // allow all users/groups to have view/modify permissions private val WILDCARD_ACL = "*" - private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false) + private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED) // keep spark.ui.acls.enable for backwards compatibility with 1.0 private var aclsOn = sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false)) @@ -516,11 +517,11 @@ private[spark] class SecurityManager( def isAuthenticationEnabled(): Boolean = authOn /** - * Checks whether SASL encryption should be enabled. - * @return Whether to enable SASL encryption when connecting to services that support it. + * Checks whether network encryption should be enabled. + * @return Whether to enable encryption when connecting to services that support it. */ - def isSaslEncryptionEnabled(): Boolean = { - sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false) + def isEncryptionEnabled(): Boolean = { + sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 601d241..308a1ed 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -607,6 +607,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria "\"client\".") } } + + val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) + require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), + s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") } /** @@ -726,6 +730,7 @@ private[spark] object SparkConf extends Logging { (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) || name.startsWith("spark.ssl") || name.startsWith("spark.rpc") || + name.startsWith("spark.network") || isSparkPortConf(name) } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 1296386..539dbb5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -235,7 +235,7 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf, ioEncryptionKey) ioEncryptionKey.foreach { _ => - if (!securityManager.isSaslEncryptionEnabled()) { + if (!securityManager.isEncryptionEnabled()) { logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " + "wire.") } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 13eadbe..8d491dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -25,8 +25,8 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.TransportContext +import org.apache.spark.network.crypto.AuthServerBootstrap import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.sasl.SaslServerBootstrap import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf @@ -47,7 +47,6 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) - private val useSasl: Boolean = securityManager.isAuthenticationEnabled() private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0) @@ -74,10 +73,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana /** Start the external shuffle service */ def start() { require(server == null, "Shuffle server already started") - logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") + val authEnabled = securityManager.isAuthenticationEnabled() + logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)") val bootstraps: Seq[TransportServerBootstrap] = - if (useSasl) { - Seq(new SaslServerBootstrap(transportConf, securityManager)) + if (authEnabled) { + Seq(new AuthServerBootstrap(transportConf, securityManager)) } else { Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index aba429b..536f493 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -243,4 +243,20 @@ package object config { "and event logs.") .stringConf .createWithDefault("(?i)secret|password") + + private[spark] val NETWORK_AUTH_ENABLED = + ConfigBuilder("spark.authenticate") + .booleanConf + .createWithDefault(false) + + private[spark] val SASL_ENCRYPTION_ENABLED = + ConfigBuilder("spark.authenticate.enableSaslEncryption") + .booleanConf + .createWithDefault(false) + + private[spark] val NETWORK_ENCRYPTION_ENABLED = + ConfigBuilder("spark.network.crypto.enabled") + .booleanConf + .createWithDefault(false) + } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 3d4ea3c..b75e91b 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} -import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap} +import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher} import org.apache.spark.network.shuffle.protocol.UploadBlock @@ -63,9 +63,8 @@ private[spark] class NettyBlockTransferService( var serverBootstrap: Option[TransportServerBootstrap] = None var clientBootstrap: Option[TransportClientBootstrap] = None if (authEnabled) { - serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager)) - clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager, - securityManager.isSaslEncryptionEnabled())) + serverBootstrap = Some(new AuthServerBootstrap(transportConf, securityManager)) + clientBootstrap = Some(new AuthClientBootstrap(transportConf, conf.getAppId, securityManager)) } transportContext = new TransportContext(transportConf, rpcHandler) clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index e56943d..1e448b2 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -33,8 +33,8 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.network.TransportContext import org.apache.spark.network.client._ +import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap} import org.apache.spark.network.server._ import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance} @@ -60,8 +60,8 @@ private[netty] class NettyRpcEnv( private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = { if (securityManager.isAuthenticationEnabled()) { - java.util.Arrays.asList(new SaslClientBootstrap(transportConf, "", securityManager, - securityManager.isSaslEncryptionEnabled())) + java.util.Arrays.asList(new AuthClientBootstrap(transportConf, + securityManager.getSaslUser(), securityManager)) } else { java.util.Collections.emptyList[TransportClientBootstrap] } @@ -111,7 +111,7 @@ private[netty] class NettyRpcEnv( def startServer(bindAddress: String, port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { - java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) + java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager)) } else { java.util.Collections.emptyList() } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 04521c9..c401867 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -125,8 +125,7 @@ private[spark] class BlockManager( // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) - new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), - securityManager.isSaslEncryptionEnabled()) + new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) } else { blockTransferService } http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 83906cf..0897891 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -303,6 +303,25 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } } + test("encryption requires authentication") { + val conf = new SparkConf() + conf.validateSettings() + + conf.set(NETWORK_ENCRYPTION_ENABLED, true) + intercept[IllegalArgumentException] { + conf.validateSettings() + } + + conf.set(NETWORK_ENCRYPTION_ENABLED, false) + conf.set(SASL_ENCRYPTION_ENABLED, true) + intercept[IllegalArgumentException] { + conf.validateSettings() + } + + conf.set(NETWORK_AUTH_ENABLED, true) + conf.validateSettings() + } + } class Class1 {} http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 022fe91..fe89558 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -94,6 +94,20 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi } } + test("security with aes encryption") { + val conf = new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good") + .set("spark.app.id", "app-id") + .set("spark.network.crypto.enabled", "true") + .set("spark.network.crypto.saslFallback", "false") + testConnection(conf, conf) match { + case Success(_) => // expected + case Failure(t) => fail(t) + } + } + + /** * Creates two servers with different configurations and sees if they can talk. * Returns Success() if they can transfer a block, and Failure() if the block transfer was failed http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index acdf21d..b4037d7 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -637,11 +637,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { assert(anotherEnv.address.port != env.address.port) } - test("send with authentication") { - val conf = new SparkConf - conf.set("spark.authenticate", "true") - conf.set("spark.authenticate.secret", "good") - + private def testSend(conf: SparkConf): Unit = { val localEnv = createRpcEnv(conf, "authentication-local", 0) val remoteEnv = createRpcEnv(conf, "authentication-remote", 0, clientMode = true) @@ -667,11 +663,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } - test("ask with authentication") { - val conf = new SparkConf - conf.set("spark.authenticate", "true") - conf.set("spark.authenticate.secret", "good") - + private def testAsk(conf: SparkConf): Unit = { val localEnv = createRpcEnv(conf, "authentication-local", 0) val remoteEnv = createRpcEnv(conf, "authentication-remote", 0, clientMode = true) @@ -695,6 +687,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("send with authentication") { + testSend(new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good")) + } + + test("send with SASL encryption") { + testSend(new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good") + .set("spark.authenticate.enableSaslEncryption", "true")) + } + + test("send with AES encryption") { + testSend(new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good") + .set("spark.network.crypto.enabled", "true") + .set("spark.network.crypto.saslFallback", "false")) + } + + test("ask with authentication") { + testAsk(new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good")) + } + + test("ask with SASL encryption") { + testAsk(new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good") + .set("spark.authenticate.enableSaslEncryption", "true")) + } + + test("ask with AES encryption") { + testAsk(new SparkConf() + .set("spark.authenticate", "true") + .set("spark.authenticate.secret", "good") + .set("spark.network.crypto.enabled", "true") + .set("spark.network.crypto.saslFallback", "false")) + } + test("construct RpcTimeout with conf property") { val conf = new SparkConf http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index b7f10e6..7c04033 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1639,40 +1639,40 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.authenticate.enableSaslEncryption</code></td> + <td><code>spark.network.crypto.enabled</code></td> <td>false</td> <td> - Enable encrypted communication when authentication is - enabled. This is supported by the block transfer service and the - RPC endpoints. + Enable encryption using the commons-crypto library for RPC and block transfer service. + Requires <code>spark.authenticate</code> to be enabled. </td> </tr> <tr> - <td><code>spark.network.sasl.serverAlwaysEncrypt</code></td> - <td>false</td> + <td><code>spark.network.crypto.keyLength</code></td> + <td>128</td> <td> - Disable unencrypted connections for services that support SASL authentication. This is - currently supported by the external shuffle service. + The length in bits of the encryption key to generate. Valid values are 128, 192 and 256. </td> </tr> <tr> - <td><code>spark.network.aes.enabled</code></td> - <td>false</td> + <td><code>spark.network.crypto.keyFactoryAlgorithm</code></td> + <td>PBKDF2WithHmacSHA1</td> <td> - Enable AES for over-the-wire encryption. This is supported for RPC and the block transfer service. - This option has precedence over SASL-based encryption if both are enabled. + The key factory algorithm to use when generating encryption keys. Should be one of the + algorithms supported by the javax.crypto.SecretKeyFactory class in the JRE being used. </td> </tr> <tr> - <td><code>spark.network.aes.keySize</code></td> - <td>16</td> + <td><code>spark.network.crypto.saslFallback</code></td> + <td>true</td> <td> - The bytes of AES cipher key which is effective when AES cipher is enabled. AES - works with 16, 24 and 32 bytes keys. + Whether to fall back to SASL authentication if authentication fails using Spark's internal + mechanism. This is useful when the application is connecting to old shuffle services that + do not support the internal Spark authentication protocol. On the server side, this can be + used to block older clients from authenticating against a new shuffle service. </td> </tr> <tr> - <td><code>spark.network.aes.config.*</code></td> + <td><code>spark.network.crypto.config.*</code></td> <td>None</td> <td> Configuration values for the commons-crypto library, such as which cipher implementations to @@ -1681,6 +1681,22 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td><code>spark.authenticate.enableSaslEncryption</code></td> + <td>false</td> + <td> + Enable encrypted communication when authentication is + enabled. This is supported by the block transfer service and the + RPC endpoints. + </td> +</tr> +<tr> + <td><code>spark.network.sasl.serverAlwaysEncrypt</code></td> + <td>false</td> + <td> + Disable unencrypted connections for services that support SASL authentication. + </td> +</tr> +<tr> <td><code>spark.core.connection.ack.wait.timeout</code></td> <td><code>spark.network.timeout</code></td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 3258b09..f555072 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -136,8 +136,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( new MesosExternalShuffleClient( SparkTransportConf.fromSparkConf(conf, "shuffle"), securityManager, - securityManager.isAuthenticationEnabled(), - securityManager.isSaslEncryptionEnabled()) + securityManager.isAuthenticationEnabled()) } var nextMesosTaskId = 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
