[
https://issues.apache.org/jira/browse/SPARK-51450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiaan Geng resolved SPARK-51450.
--------------------------------
Fix Version/s: 3.5.6
(was: 3.5.5)
Resolution: Fixed
Issue resolved by pull request 50223
[https://github.com/apache/spark/pull/50223]
> BarrierCoordinator thread not exiting in Spark standalone mode
> --------------------------------------------------------------
>
> Key: SPARK-51450
> URL: https://issues.apache.org/jira/browse/SPARK-51450
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.3, 3.5.5
> Reporter: Jayadeep Jayaraman
> Assignee: Jayadeep Jayaraman
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.5.6
>
>
> When *xgboost* or any framework that relies on *Barrier* stage runs in a
> standalone mode does not exit when the JVM shuts down. This is because the
> `timer` class is not cancelled when *onStop* method is invoked.
>
> This can be observed with the below repro code
>
> {code:java}
> import random
> from xgboost.spark import SparkXGBClassifier
> from pyspark.sql import SparkSession
> from pyspark.ml.feature import VectorAssembler
> spark =
> SparkSession.builder.appName('spark_session').config(map={"spark.python.worker.reuse":
> "false","spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled":
> "false","spark.sql.pyspark.jvmStacktrace.enabled": "true","spark.cores.max":
> "4","spark.task.cpus": "1","spark.executor.cores":
> "2","spark.dynamicAllocation.enabled":"false","spark.executor.instances":"2","spark.driver.memory":"10g","spark.executor.memory":"10g"}).getOrCreate()
> dataset_size = 100_000
> labels = [0, 1, 2]
> feature_cols = ["feature1", "feature2", "feature3"]
> label_col = "label"
> pred_col_name = "pred"
> all_cols = ["id1", "id2"] + feature_cols + [label_col]
> data = [
> (i+1, i+1001, random.random(), random.random() * 1_000, random.random() *
> 1_000, random.choice(labels))
> for i in range(dataset_size)
> ]
> prediction_input = spark.createDataFrame(data, ("id1", "id2", "feature1",
> "feature2", "feature3", 'label'))
> # VectorAssembler required if you cannot train on GPU
> vec_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
> prediction_input = vec_assembler.transform(prediction_input)
> prediction_input.show(5)
> # create a xgboost pyspark regressor estimator and set device="cuda"
> classifier = SparkXGBClassifier(
> features_col='features',
> label_col=label_col,
> num_workers=spark.sparkContext.defaultParallelism
> )
> # train and return the model
> model = classifier.fit(prediction_input)
> # predict on test data
> predict_df = model.transform(prediction_input)
> predict_df.show(5)
> spark.stop()
> exit(0)
> print("Called Stop") {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]