Jahar created SPARK-36534:
-----------------------------
Summary: No way to check If Spark Session is created successfully
with No Exceptions and ready to execute Tasks
Key: SPARK-36534
URL: https://issues.apache.org/jira/browse/SPARK-36534
Project: Spark
Issue Type: Improvement
Components: Java API, Kubernetes, Scheduler
Affects Versions: 3.0.1
Environment: *Spark 3.0.1*
Reporter: Jahar
I am running Spark on Kubernetes in Client mode. Spark driver is spawned
programmatically (No Spark-Submit). Below is the dummy code to set SparkSession
with KubeApiServer as Master.
{code:java}
// code placeholder
private static SparkSession getSparkSession()
{
mySparkSessionBuilder = SparkSession.builder()
.master("k8s://http://<some_IP>:6443")
.appName("spark-K8sDemo")
.config("spark.kubernetes.container.image","spark:3.0")
.appName("spark-K8sDemo")
.config("spark.jars",
"/tmp/jt/database-0.0.1-SNAPSHOT-jar-with-dependencies.jar")
.config("spark.kubernetes.executor.podTemplateFile","/tmp/jt/sparkExecutorPodTemplate.yaml")
.config("spark.kubernetes.container.image.pullPolicy","Always")
.config("spark.kubernetes.namespace","my_namespace")
.config("spark.driver.host", "spark-driver-example")
.config("spark.driver.port", "29413")
.config("spark.kubernetes.authenticate.driver.serviceAccountName","spark")
.config("spark.extraListeners","K8sPoc.MyHealthCheckListener");
setAditionalConfig();
mySession= mySparkSessionBuilder.getOrCreate();
return mySession;
}
{code}
Now the problem is that, in certain scenarios like if K8s master is not
reachable or master URL is incorrect or spark.kubernetes.container.image config
is missing then it throws below exceptions (*Exception 1* and *Exception 2*
given below).
These exceptions are never propagated to Spark Driver program which in turn
makes Spark Application in stuck state forever.
There should be a way to know via SparkSession or SparkContext object if
Session was created successful without any such exceptions and can run
SparkTasks??
I have looked at SparkSession, SparkContext API documentation and
SparkListeners but didn't find any such way to check if SparkSession is ready
to run the Tasks or if not then dont keep the Spark Application in hanging
state rather return a proper error/warn message to calling API.
*Exception 1: (If _spark.kubernetes.container.image_ config is missing:*
{noformat}
{noformat}
{noformat}
21/08/16 16:27:07 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and have
sufficient resources 21/08/16 16:27:07 INFO ExecutorPodsAllocator: Going to
request 2 executors from Kubernetes. 21/08/16 16:27:07 ERROR Utils: Uncaught
exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image at
org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.$anonfun$executorContainerImage$1(BasicExecutorFeatureStep.scala:41)
at scala.Option.getOrElse(Option.scala:189) at
org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.(BasicExecutorFeatureStep.scala:41)
at
org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:216)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}
*Exception 2: (If _K8s master_ is not reachable or wrong URL:*
{noformat}
{noformat}
{noformat}
21/08/16 16:45:07 ERROR Utils: Uncaught exception in thread
kubernetes-executor-snapshots-subscribers-1
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create] for
kind: [Pod] with name: [null] in namespace: [default] failed. at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:338)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:222)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.net.SocketTimeoutException: connect timed out at
java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at
java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162) at
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at
java.net.Socket.connect(Socket.java:606) at
okhttp3.internal.platform.Platform.connectSocket(Platform.java:129) at
okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:247)
at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167)
at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258)
at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:127)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at
io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at
io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at
io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:111)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:257) at
okhttp3.RealCall.execute(RealCall.java:93) at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:469)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:251)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:815)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:333)
... 16 more 21/08/16 16:45:08 INFO ExecutorPodsAllocator: Going to request 2
executors from Kubernetes. 21/08/16 16:45:11 WARN TaskSchedulerImpl: Initial
job has not accepted any resources; check your cluster UI to ensure that
workers are registered and have sufficient resources 21/08/16 16:45:11 WARN
WatchConnectionManager: Exec Failure{noformat}
{noformat}
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]