[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411246#comment-16411246
 ] 

Deepansh commented on SPARK-23650:
----------------------------------

R environment inside the thread for applying UDF is not getting cached. It is 
created and destroyed with each query.

{code:R}
kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
"10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)

df4<-dapply(lines,function(x){
  print(system.time(library(caret)))
  x
},schema)

q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", 
kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)
{code}

For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread:     histogram
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread:    user  system elapsed 
18/03/23 11:08:12 INFO BufferedStreamThread:   1.937   0.062   1.999 
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, 
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 
0.001 s, total = 2.093 s

PFA: rest log file.

For every new coming stream, the packages are loaded again inside the thread, 
which means R environment inside the thread is not getting reused, it is 
created and destroyed every time.

The model(iris model), on which I am testing requires caret package. So, when I 
use the readRDS method, caret package is also loaded, which adds an overhead of 
(~2s) every time. 

The same problem is with the broadcast. Broadcasting the model doesn't take 
time, but when it deserializes the model it loads caret package which adds 2s 
overhead.

Ideally, the packages shouldn't load again. Is there a way around to this 
problem?

> Slow SparkR udf (dapply)
> ------------------------
>
>                 Key: SPARK-23650
>                 URL: https://issues.apache.org/jira/browse/SPARK-23650
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Shell, SparkR, Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Deepansh
>            Priority: Major
>         Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to