This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0671395 [SPARK-27989][CORE] Added retries on the connection to the
driver for k8s
0671395 is described below
commit 06713959668c6d8015bd79b79f22cdb5fb5a32e1
Author: Jose Luis Pedrosa <[email protected]>
AuthorDate: Mon Jun 24 09:25:43 2019 -0500
[SPARK-27989][CORE] Added retries on the connection to the driver for k8s
Disabled negative dns caching for docker images
Improved logging on DNS resolution, convenient for slow k8s clusters
## What changes were proposed in this pull request?
Added retries when building the connection to the driver in K8s.
In some scenarios DNS reslution can take more than the timeout.
Also openjdk-8 by default has negative dns caching enabled, which means
even retries may not help depending on the times.
## How was this patch tested?
This patch was tested agains an specific k8s cluster with slow response
time in DNS to ensure it woks.
Closes #24702 from jlpedrosa/feature/kuberetries.
Authored-by: Jose Luis Pedrosa <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/network/client/TransportClientFactory.java | 5 +++--
.../spark/executor/CoarseGrainedExecutorBackend.scala | 14 +++++++++++++-
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index a8e2715..3bc8729 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -172,10 +172,11 @@ public class TransportClientFactory implements Closeable {
final long preResolveHost = System.nanoTime();
final InetSocketAddress resolvedAddress = new
InetSocketAddress(remoteHost, remotePort);
final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) /
1000000;
+ final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" :
"succeed";
if (hostResolveTimeMs > 2000) {
- logger.warn("DNS resolution for {} took {} ms", resolvedAddress,
hostResolveTimeMs);
+ logger.warn("DNS resolution {} for {} took {} ms", resolvMsg,
resolvedAddress, hostResolveTimeMs);
} else {
- logger.trace("DNS resolution for {} took {} ms", resolvedAddress,
hostResolveTimeMs);
+ logger.trace("DNS resolution {} for {} took {} ms", resolvMsg,
resolvedAddress, hostResolveTimeMs);
}
synchronized (clientPool.locks[clientIndex]) {
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 2f4fc0e..98e5aa6 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -262,7 +262,19 @@ private[spark] object CoarseGrainedExecutorBackend extends
Logging {
executorConf,
new SecurityManager(executorConf),
clientMode = true)
- val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
+
+ var driver: RpcEndpointRef = null
+ val nTries = 3
+ for (i <- 0 until nTries if driver == null) {
+ try {
+ driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
+ } catch {
+ case e: Throwable => if (i == nTries - 1) {
+ throw e
+ }
+ }
+ }
+
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String,
String)](("spark.app.id", arguments.appId))
fetcher.shutdown()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]