[ 
https://issues.apache.org/jira/browse/SPARK-38438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502633#comment-17502633
 ] 

Rafal Wojdyla edited comment on SPARK-38438 at 3/8/22, 4:33 AM:
----------------------------------------------------------------

The workaround actually doesn't stop the existing JVM, it does stop most of the 
threads in the JVM (including spark context related, and py4j gateway), turns 
out the only (non-daemon) thread left is the `main` thread:

{noformat}
"main" #1 prio=5 os_prio=31 cpu=1381.53ms elapsed=67.25s tid=0x00007fc478809000 
nid=0x2703 runnable  [0x000070000c094000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(java.base@11.0.9.1/Native Method)
        at 
java.io.FileInputStream.read(java.base@11.0.9.1/FileInputStream.java:279)
        at 
java.io.BufferedInputStream.fill(java.base@11.0.9.1/BufferedInputStream.java:252)
        at 
java.io.BufferedInputStream.read(java.base@11.0.9.1/BufferedInputStream.java:271)
        - locked <0x00000007c1012ca0> (a java.io.BufferedInputStream)
        at 
org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:68)
        at 
org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native 
Method)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

This is waiting on the python process to stop: 
https://github.com/apache/spark/blob/71991f75ff441e80a52cb71f66f46bfebdb05671/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L68-L70

Would it make sense to just close the stdin to trigger shutdown of the JVM, in 
which case the hard reset would be:

{code:python}
s.stop()
s._sc._gateway.shutdown()
s._sc._gateway.proc.stdin.close()
SparkContext._gateway = None
SparkContext._jvm = None
{code}


was (Author: ravwojdyla):
The workaround actually doesn't stop the existing JVM, it does stop most of the 
threads in the JVM (including spark context related, and py4j gateway), turns 
out the only thread left is the `main` thread:

{noformat}
"main" #1 prio=5 os_prio=31 cpu=1381.53ms elapsed=67.25s tid=0x00007fc478809000 
nid=0x2703 runnable  [0x000070000c094000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(java.base@11.0.9.1/Native Method)
        at 
java.io.FileInputStream.read(java.base@11.0.9.1/FileInputStream.java:279)
        at 
java.io.BufferedInputStream.fill(java.base@11.0.9.1/BufferedInputStream.java:252)
        at 
java.io.BufferedInputStream.read(java.base@11.0.9.1/BufferedInputStream.java:271)
        - locked <0x00000007c1012ca0> (a java.io.BufferedInputStream)
        at 
org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:68)
        at 
org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native 
Method)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

This is waiting on the python process to stop: 
https://github.com/apache/spark/blob/71991f75ff441e80a52cb71f66f46bfebdb05671/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L68-L70

Would it make sense to just close the stdin to trigger shutdown of the JVM, in 
which case the hard reset would be:

{code:python}
s.stop()
s._sc._gateway.shutdown()
s._sc._gateway.proc.stdin.close()
SparkContext._gateway = None
SparkContext._jvm = None
{code}

> Can't update spark.jars.packages on existing global/default context
> -------------------------------------------------------------------
>
>                 Key: SPARK-38438
>                 URL: https://issues.apache.org/jira/browse/SPARK-38438
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 3.2.1
>         Environment: py: 3.9
> spark: 3.2.1
>            Reporter: Rafal Wojdyla
>            Priority: Major
>
> Reproduction:
> {code:python}
> from pyspark.sql import SparkSession
> # default session:
> s = SparkSession.builder.getOrCreate()
> # later on we want to update jars.packages, here's e.g. spark-hats
> s = (SparkSession.builder
>      .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2")
>      .getOrCreate())
> # line below return None, the config was not propagated:
> s._sc._conf.get("spark.jars.packages")
> {code}
> Stopping the context doesn't help, in fact it's even more confusing, because 
> the configuration is updated, but doesn't have an effect:
> {code:python}
> from pyspark.sql import SparkSession
> # default session:
> s = SparkSession.builder.getOrCreate()
> s.stop()
> s = (SparkSession.builder
>      .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2")
>      .getOrCreate())
> # now this line returns 'za.co.absa:spark-hats_2.12:0.2.2', but the context
> # doesn't download the jar/package, as it would if there was no global context
> # thus the extra package is unusable. It's not downloaded, or added to the
> # classpath.
> s._sc._conf.get("spark.jars.packages")
> {code}
> One workaround is to stop the context AND kill the JVM gateway, which seems 
> to be a kind of hard reset:
> {code:python}
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> # default session:
> s = SparkSession.builder.getOrCreate()
> # Hard reset:
> s.stop()
> s._sc._gateway.shutdown()
> SparkContext._gateway = None
> SparkContext._jvm = None
> s = (SparkSession.builder
>      .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2")
>      .getOrCreate())
> # Now we are guaranteed there's a new spark session, and packages
> # are downloaded, added to the classpath etc.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to