[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411246#comment-16411246 ]
Deepansh commented on SPARK-23650: ---------------------------------- R environment inside the thread for applying UDF is not getting cached. It is created and destroyed with each query. {code:R} kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source") lines<- select(kafka, cast(kafka$value, "string")) schema<-schema(lines) library(caret) df4<-dapply(lines,function(x){ print(system.time(library(caret))) x },schema) q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", kafka.bootstrap.servers = "10.117.172.48:9092") awaitTermination(q2) {code} For the above code, for every new stream my output is, 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: histogram 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed 18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s PFA: rest log file. For every new coming stream, the packages are loaded again inside the thread, which means R environment inside the thread is not getting reused, it is created and destroyed every time. The model(iris model), on which I am testing requires caret package. So, when I use the readRDS method, caret package is also loaded, which adds an overhead of (~2s) every time. The same problem is with the broadcast. Broadcasting the model doesn't take time, but when it deserializes the model it loads caret package which adds 2s overhead. Ideally, the packages shouldn't load again. Is there a way around to this problem? > Slow SparkR udf (dapply) > ------------------------ > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming > Affects Versions: 2.2.0 > Reporter: Deepansh > Priority: Major > Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org