This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 14f714f  [SPARK-26420][K8S] Generate more unique IDs when creating k8s 
resource names.
14f714f is described below

commit 14f714fb304d82842336e0a772173fc9da1f6e22
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Thu Feb 28 20:39:13 2019 -0800

    [SPARK-26420][K8S] Generate more unique IDs when creating k8s resource 
names.
    
    Using the current time as an ID is more prone to clashes than people 
generally
    realize, so try to make things a bit more unique without necessarily using a
    UUID, which would eat too much space in the names otherwise.
    
    The implemented approach uses some bits from the current time, plus some 
random
    bits, which should be more resistant to clashes.
    
    Closes #23805 from vanzin/SPARK-26420.
    
    Authored-by: Marcelo Vanzin <van...@cloudera.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |  4 +-
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  | 26 +++++++++-
 .../k8s/features/DriverServiceFeatureStep.scala    |  6 +--
 .../features/DriverServiceFeatureStepSuite.scala   | 56 ++++++++++++----------
 4 files changed, 61 insertions(+), 31 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 4a63ea9..5e74111 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -190,8 +190,8 @@ private[spark] object KubernetesConf {
   }
 
   def getResourceNamePrefix(appName: String): String = {
-    val launchTime = System.currentTimeMillis()
-    s"$appName-$launchTime"
+    val id = KubernetesUtils.uniqueID()
+    s"$appName-$id"
       .trim
       .toLowerCase(Locale.ROOT)
       .replaceAll("\\s+", "-")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index b3f58b0..3f7fcec 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -17,18 +17,23 @@
 package org.apache.spark.deploy.k8s
 
 import java.io.File
+import java.security.SecureRandom
 
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, 
ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, 
ContainerStatus, Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.commons.codec.binary.Hex
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 private[spark] object KubernetesUtils extends Logging {
 
+  private val systemClock = new SystemClock()
+  private lazy val RNG = new SecureRandom()
+
   /**
    * Extract and parse Spark configuration properties with a given name prefix 
and
    * return the result as a Map. Keys must not have more than one value.
@@ -185,4 +190,23 @@ private[spark] object KubernetesUtils extends Logging {
   def formatTime(time: String): String = {
     if (time != null) time else "N/A"
   }
+
+  /**
+   * Generates a unique ID to be used as part of identifiers. The returned ID 
is a hex string
+   * of a 64-bit value containing the 40 LSBs from the current time + 24 
random bits from a
+   * cryptographically strong RNG. (40 bits gives about 30 years worth of 
"unique" timestamps.)
+   *
+   * This avoids using a UUID for uniqueness (too long), and relying solely on 
the current time
+   * (not unique enough).
+   */
+  def uniqueID(clock: Clock = systemClock): String = {
+    val random = new Array[Byte](3)
+    synchronized {
+      RNG.nextBytes(random)
+    }
+
+    val time = java.lang.Long.toHexString(clock.getTimeMillis() & 
0xFFFFFFFFFFL)
+    Hex.encodeHexString(random) + time
+  }
+
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index 1567117..cec8769 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -20,14 +20,14 @@ import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
 
-import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.{Clock, SystemClock}
 
 private[spark] class DriverServiceFeatureStep(
     kubernetesConf: KubernetesDriverConf,
-    clock: Clock = new SystemClock)
+    clock: Clock = new SystemClock())
   extends KubernetesFeatureConfigStep with Logging {
   import DriverServiceFeatureStep._
 
@@ -42,7 +42,7 @@ private[spark] class DriverServiceFeatureStep(
   private val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
     preferredServiceName
   } else {
-    val randomServiceId = clock.getTimeMillis()
+    val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
     val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
     logWarning(s"Driver's hostname would preferably be $preferredServiceName, 
but this is " +
       s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling 
back to use " +
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 822f1e3..fbd99b7 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
 
+import com.google.common.net.InternetDomainName
 import io.fabric8.kubernetes.api.model.Service
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -71,7 +72,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
     val expectedServiceName = kconf.resourceNamePrefix + 
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
     val expectedHostName = s"$expectedServiceName.my-namespace.svc"
     val additionalProps = configurationStep.getAdditionalPodSystemProperties()
-    verifySparkConfHostNames(additionalProps, expectedHostName)
+    assert(additionalProps(DRIVER_HOST_ADDRESS.key) === expectedHostName)
   }
 
   test("Ports should resolve to defaults in SparkConf and in the service.") {
@@ -91,26 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
     assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === 
DEFAULT_BLOCKMANAGER_PORT.toString)
   }
 
-  test("Long prefixes should switch to using a generated name.") {
-    val clock = new ManualClock()
-    clock.setTime(10000)
+  test("Long prefixes should switch to using a generated unique name.") {
     val sparkConf = new SparkConf(false)
       .set(KUBERNETES_NAMESPACE, "my-namespace")
-    val configurationStep = new DriverServiceFeatureStep(
-      KubernetesTestConf.createDriverConf(
-        sparkConf = sparkConf,
-        resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
-        labels = DRIVER_LABELS),
-      clock)
-    val driverService = configurationStep
-      .getAdditionalKubernetesResources()
-      .head
-      .asInstanceOf[Service]
-    val expectedServiceName = 
s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}"
-    assert(driverService.getMetadata.getName === expectedServiceName)
-    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
-    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
-    verifySparkConfHostNames(additionalProps, expectedHostName)
+    val kconf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
+      labels = DRIVER_LABELS)
+    val clock = new ManualClock()
+
+    // Ensure that multiple services created at the same time generate unique 
names.
+    val services = (1 to 10).map { _ =>
+      val configurationStep = new DriverServiceFeatureStep(kconf, clock = 
clock)
+      val serviceName = configurationStep
+        .getAdditionalKubernetesResources()
+        .head
+        .asInstanceOf[Service]
+        .getMetadata
+        .getName
+
+      val hostAddress = configurationStep
+        .getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)
+
+      (serviceName -> hostAddress)
+    }.toMap
+
+    assert(services.size === 10)
+    services.foreach { case (name, address) =>
+      assert(!name.startsWith(kconf.resourceNamePrefix))
+      assert(!address.startsWith(kconf.resourceNamePrefix))
+      assert(InternetDomainName.isValid(address))
+    }
   }
 
   test("Disallow bind address and driver host to be set explicitly.") {
@@ -156,10 +168,4 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
     assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
     assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
   }
-
-  private def verifySparkConfHostNames(
-      driverSparkConf: Map[String, String], expectedHostName: String): Unit = {
-    assert(driverSparkConf(
-      org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === 
expectedHostName)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to