Repository: spark Updated Branches: refs/heads/branch-2.2 c91191bed -> 1bfd1a83b
[SPARK-21494][NETWORK] Use correct app id when authenticating to external service. There was some code based on the old SASL handler in the new auth client that was incorrectly using the SASL user as the user to authenticate against the external shuffle service. This caused the external service to not be able to find the correct secret to authenticate the connection, failing the connection. In the course of debugging, I found that some log messages from the YARN shuffle service were a little noisy, so I silenced some of them, and also added a couple of new ones that helped find this issue. On top of that, I found that a check in the code that records app secrets was wrong, causing more log spam and also using an O(n) operation instead of an O(1) call. Also added a new integration suite for the YARN shuffle service with auth on, and verified it failed before, and passes now. Author: Marcelo Vanzin <[email protected]> Closes #18706 from vanzin/SPARK-21494. (cherry picked from commit 300807c6e3011e4d78c6cf750201d0ab8e5bdaf5) Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bfd1a83 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bfd1a83 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bfd1a83 Branch: refs/heads/branch-2.2 Commit: 1bfd1a83b5e18f42bf76c1d72cd0347ff578e9cd Parents: c91191b Author: Marcelo Vanzin <[email protected]> Authored: Tue Jul 25 17:57:26 2017 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Tue Jul 25 17:57:35 2017 -0700 ---------------------------------------------------------------------- .../network/crypto/AuthClientBootstrap.java | 6 +-- .../spark/network/crypto/AuthRpcHandler.java | 7 +++- .../network/sasl/ShuffleSecretManager.java | 4 +- .../spark/network/yarn/YarnShuffleService.java | 2 - .../yarn/YarnShuffleIntegrationSuite.scala | 42 +++++++++++++++++--- 5 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1bfd1a83/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java index 799f454..3c26378 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java @@ -50,7 +50,6 @@ public class AuthClientBootstrap implements TransportClientBootstrap { private final TransportConf conf; private final String appId; - private final String authUser; private final SecretKeyHolder secretKeyHolder; public AuthClientBootstrap( @@ -65,7 +64,6 @@ public class AuthClientBootstrap implements TransportClientBootstrap { // required by the protocol. At some point, though, it would be better for the actual app ID // to be provided here. this.appId = appId; - this.authUser = secretKeyHolder.getSaslUser(appId); this.secretKeyHolder = secretKeyHolder; } @@ -97,8 +95,8 @@ public class AuthClientBootstrap implements TransportClientBootstrap { private void doSparkAuth(TransportClient client, Channel channel) throws GeneralSecurityException, IOException { - String secretKey = secretKeyHolder.getSecretKey(authUser); - try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) { + String secretKey = secretKeyHolder.getSecretKey(appId); + try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) { ClientChallenge challenge = engine.challenge(); ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength()); challenge.encode(challengeData); http://git-wip-us.apache.org/repos/asf/spark/blob/1bfd1a83/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 0a5c029..8a6e385 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -20,6 +20,7 @@ package org.apache.spark.network.crypto; import java.nio.ByteBuffer; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -113,7 +114,11 @@ class AuthRpcHandler extends RpcHandler { // Here we have the client challenge, so perform the new auth protocol and set up the channel. AuthEngine engine = null; try { - engine = new AuthEngine(challenge.appId, secretKeyHolder.getSecretKey(challenge.appId), conf); + String secret = secretKeyHolder.getSecretKey(challenge.appId); + Preconditions.checkState(secret != null, + "Trying to authenticate non-registered app %s.", challenge.appId); + LOG.debug("Authenticating challenge for app {}.", challenge.appId); + engine = new AuthEngine(challenge.appId, secret, conf); ServerResponse response = engine.respond(challenge); ByteBuf responseData = Unpooled.buffer(response.encodedLength()); response.encode(responseData); http://git-wip-us.apache.org/repos/asf/spark/blob/1bfd1a83/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java index 426a604..d2d008f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java @@ -47,7 +47,7 @@ public class ShuffleSecretManager implements SecretKeyHolder { * fetching shuffle files written by other executors in this application. */ public void registerApp(String appId, String shuffleSecret) { - if (!shuffleSecretMap.contains(appId)) { + if (!shuffleSecretMap.containsKey(appId)) { shuffleSecretMap.put(appId, shuffleSecret); logger.info("Registered shuffle secret for application {}", appId); } else { @@ -67,7 +67,7 @@ public class ShuffleSecretManager implements SecretKeyHolder { * This is called when the application terminates. */ public void unregisterApp(String appId) { - if (shuffleSecretMap.contains(appId)) { + if (shuffleSecretMap.containsKey(appId)) { shuffleSecretMap.remove(appId); logger.info("Unregistered shuffle secret for application {}", appId); } else { http://git-wip-us.apache.org/repos/asf/spark/blob/1bfd1a83/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java ---------------------------------------------------------------------- diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index fd50e3a..cd67eb2 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -243,7 +243,6 @@ public class YarnShuffleService extends AuxiliaryService { String appId = context.getApplicationId().toString(); try { ByteBuffer shuffleSecret = context.getApplicationDataForService(); - logger.info("Initializing application {}", appId); if (isAuthenticationEnabled()) { AppId fullId = new AppId(appId); if (db != null) { @@ -262,7 +261,6 @@ public class YarnShuffleService extends AuxiliaryService { public void stopApplication(ApplicationTerminationContext context) { String appId = context.getApplicationId().toString(); try { - logger.info("Stopping application {}", appId); if (isAuthenticationEnabled()) { AppId fullId = new AppId(appId); if (db != null) { http://git-wip-us.apache.org/repos/asf/spark/blob/1bfd1a83/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 950ebd9..75427b4 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -26,7 +26,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers import org.apache.spark._ +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} import org.apache.spark.tags.ExtendedYarnTest @@ -46,28 +48,58 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { yarnConfig } + protected def extraSparkConf(): Map[String, String] = { + val shuffleServicePort = YarnTestAccessor.getShuffleServicePort + val shuffleService = YarnTestAccessor.getShuffleServiceInstance + logInfo("Shuffle service port = " + shuffleServicePort) + + Map( + "spark.shuffle.service.enabled" -> "true", + "spark.shuffle.service.port" -> shuffleServicePort.toString, + MAX_EXECUTOR_FAILURES.key -> "1" + ) + } + test("external shuffle service") { val shuffleServicePort = YarnTestAccessor.getShuffleServicePort val shuffleService = YarnTestAccessor.getShuffleServiceInstance val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService) - logInfo("Shuffle service port = " + shuffleServicePort) val result = File.createTempFile("result", null, tempDir) val finalState = runSpark( false, mainClassName(YarnExternalShuffleDriver.getClass), appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath), - extraConf = Map( - "spark.shuffle.service.enabled" -> "true", - "spark.shuffle.service.port" -> shuffleServicePort.toString - ) + extraConf = extraSparkConf() ) checkResult(finalState, result) assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) } } +/** + * Integration test for the external shuffle service with auth on. + */ +@ExtendedYarnTest +class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { + + override def newYarnConfig(): YarnConfiguration = { + val yarnConfig = super.newYarnConfig() + yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true") + yarnConfig.set(NETWORK_ENCRYPTION_ENABLED.key, "true") + yarnConfig + } + + override protected def extraSparkConf(): Map[String, String] = { + super.extraSparkConf() ++ Map( + NETWORK_AUTH_ENABLED.key -> "true", + NETWORK_ENCRYPTION_ENABLED.key -> "true" + ) + } + +} + private object YarnExternalShuffleDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
