This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 67651a7d5ac [SPARK-42186][R] Make SparkR be able to stop properly when
the connection is timed-out
67651a7d5ac is described below
commit 67651a7d5acb733273550ec9a01f45a9988845d5
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Jan 25 09:13:14 2023 -0800
[SPARK-42186][R] Make SparkR be able to stop properly when the connection
is timed-out
### What changes were proposed in this pull request?
This PR proposes to allow to stop SparkR session even when the connection
is timed-out.
### Why are the changes needed?
Previously, when the connection is timed out from the R backend, it was
unable to stop and restart the session to recover as below:
```bash
./bin/sparkR --conf spark.r.backendConnectionTimeout=10
```
After waiting 10 seconds
```R
...
23/01/25 21:24:53 WARN RBackendHandler: Ignoring read timeout in
RBackendHandler
> sparkR.session()
Error in handleErrors(returnStatus, conn) :
No status is returned. Java SparkR backend might have failed.
```
```R
> sparkR.session.stop()
Error in writeBin(requestMessage, conn) : ignoring SIGPIPE signal
```
```R
> sparkR.session()
Error in handleErrors(returnStatus, conn) :
No status is returned. Java SparkR backend might have failed.
In addition: Warning message:
In writeBin(requestMessage, conn) : problem writing to connection
```
After this change, we can stop and restart to continue using SparkR below:
```R
...
23/01/25 21:35:21 WARN RBackendHandler: Ignoring read timeout in
RBackendHandler
> sparkR.session()
Error in handleErrors(returnStatus, conn) :
No status is returned. Java SparkR backend might have failed.
```
```R
> sparkR.session.stop()
Warning messages:
1: In writeBin(requestMessage, conn) : ignoring SIGPIPE signal
2: In writeBin(requestMessage, conn) : problem writing to connection
3: In handleErrors(returnStatus, conn) :
No status is returned. Java SparkR backend might have failed.
```
```R
> sparkR.session()
Launching java with spark-submit command /.../spark/bin/spark-submit
"--conf" "spark.r.backendConnectionTimeout=10" "sparkr-shell"
/var/folders/0c/q8y15ybd3tn7sr2_jmbmftr80000gp/T//Rtmp7rDIwy/backend_port138dd1da2739e
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
23/01/25 21:34:53 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Java ref type org.apache.spark.sql.SparkSession id 1
```
which allows restarting the backend, and running SparkR afterwards.
```R
> head(createDataFrame(iris))
Sepal_Length Sepal_Width Petal_Length Petal_Width Species
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3.0 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5.0 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
```
In addition, I manually checked that the JVM is terminated properly after
stopping.
### Does this PR introduce _any_ user-facing change?
Yes, it supports to stop the SparkR session properly when the connection to
R backend is timed-out.
### How was this patch tested?
Manually tested as described above. It is difficult to write a unittest.
Closes #39742 from HyukjinKwon/SPARK-42186.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
R/pkg/R/sparkR.R | 36 +++++++++++++++++++++++++++++-------
1 file changed, 29 insertions(+), 7 deletions(-)
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index f18a6c7e25f..e2ab5747177 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -40,8 +40,15 @@ sparkR.session.stop <- function() {
env <- .sparkREnv
if (exists(".sparkRCon", envir = env)) {
if (exists(".sparkRjsc", envir = env)) {
- sc <- get(".sparkRjsc", envir = env)
- callJMethod(sc, "stop")
+ # Should try catch for every use of the connection in case
+ # the connection is timed-out, see also SPARK-42186.
+ tryCatch({
+ sc <- get(".sparkRjsc", envir = env)
+ callJMethod(sc, "stop")
+ },
+ error = function(err) {
+ warning(err)
+ })
rm(".sparkRjsc", envir = env)
if (exists(".sparkRsession", envir = env)) {
@@ -56,20 +63,35 @@ sparkR.session.stop <- function() {
}
if (exists(".backendLaunched", envir = env)) {
- callJStatic("SparkRHandler", "stopBackend")
+ tryCatch({
+ callJStatic("SparkRHandler", "stopBackend")
+ },
+ error = function(err) {
+ warning(err)
+ })
}
# Also close the connection and remove it from our env
- conn <- get(".sparkRCon", envir = env)
- close(conn)
+ tryCatch({
+ conn <- get(".sparkRCon", envir = env)
+ close(conn)
+ },
+ error = function(err) {
+ warning(err)
+ })
rm(".sparkRCon", envir = env)
rm(".scStartTime", envir = env)
}
if (exists(".monitorConn", envir = env)) {
- conn <- get(".monitorConn", envir = env)
- close(conn)
+ tryCatch({
+ conn <- get(".monitorConn", envir = env)
+ close(conn)
+ },
+ error = function(err) {
+ warning(err)
+ })
rm(".monitorConn", envir = env)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]