Spark checkpoint problem for python api
Hi, My code is below: from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils def test(record_list): print(list(record_list)) return record_list def functionToCreateContext(): conf = SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077") \ .set("spark.executor.memory", '6g') \ .set("spark.executor.cores", '8') \ .set("spark.deploy.defaultCores", '8') \ .set("spark.cores.max", '16') \ .set("spark.streaming.kafka.maxRatePerPartition", 1) \ .set("spark.streaming.blockInterval", 1) \ .set("spark.default.parallelism", 8) \ .set("spark.driver.host", '172.22.9.181') \ sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 5) ssc.checkpoint("/spark/checkpoints/model_event_spark") return ssc if __name__ == '__main__': ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark", functionToCreateContext) record_dstream = KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"], kafkaParams={"bootstrap.servers":"xxx:9092", "auto.offset.reset":"smallest", }, ) record_dstream.checkpoint(5).mapPartitions(test).pprint() ssc.start() ssc.awaitTermination() When the scripts starts at the first time,it work well. But second time started from checkpointDirectory,it has problem like: 2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
repartitionByRange and number of tasks
Hi, *Hardware and Spark Details:* * Spark 2.4.3 * EMR 30 node cluster with each executor having 4 cores and 15 GB RAM. At 100% allocation 4 executors are running in each node *Question:* when I am executing the following code then the around 60 partitions being written out using only 20 tasks which runs in parallel. Its late in the night here and I am failing to understand what needs to be done in order to increasing the number of tasks while writing. We can clearly write with more than 20 tasks in parallel. The reads of around 50,000 historical files happens in parallel but the issue is with the write, which has only 20 tasks for around 60 partitions. *Code:* spark.read.csv("s3://bucket_name/file_initials_*.gz").repartitionByRange(60, "partition_field").write.partitionBy("partition_field").parquet("s3://bucket_name/key/") Thanks and Regards, Gourav Sengupta
Number of tasks...
Hello there, I have a basic question with how the number of tasks are determined per spark job. Let's say the scope of this discussion around parquet and Spark 2.x. 1. I thought that, the number of jobs is proportional to the number of part files that exist. Is this correct? 2. I noticed that for a 25 core job, the number of tasks scheduled was around 250. But the same job, when executed with 75 cores had around 460 tasks. Are the number of tasks proportional to cores used? Note, the number of tasks, I refer to here are the tasks count during ` spark.read.parquet("")` operation. I do understand that, during join / reduce operation, the shuffle takes control of the number of tasks for the next stage (from "spark.sql.shuffle.partitions" which defaults to 200 -- https://spark.apache.org/docs/latest/sql-performance-tuning.html). 3. Also for "spark.sql.shuffle.partitions" is there anyway I can provide a computed value based on input data / join / UDAF functions used? Ideally if the tasks are around 200, I might see OutOfMemory issue (depending on the data size). Too large of a number would create many small tasks. The right balance may be based on input data size + shuffle operation. Please advice Muthu
Re: Logistic Regression Iterations causing High GC in Spark 2.3
Actually the original data is around ~120 GB. If we provide higher memory then we might require an even bigger cluster to finish training the whole model within planned time. And this will affect the cost of operations. Please correct me if I am wrong here. Nevertheless, can you point out how much memory should be sufficient for each executor. I already gave 9GB of Memory with 20 executors to process the data. On Mon, Jul 29, 2019 at 7:42 PM Sean Owen wrote: > Could be lots of things. Implementations change, caching may have > changed, etc. The size of the input doesn't really directly translate > to heap usage. Here you just need a bit more memory. > > On Mon, Jul 29, 2019 at 9:03 AM Dhrubajyoti Hati > wrote: > > > > Hi Sean, > > > > Yeah I checked the heap, its almost full. I checked the GC logs in the > executors where I found that GC cycles are kicking in frequently. The > Executors tab shows red in the "Total Time/GC Time". > > > > Also the data which I am dealing with is quite small(~4 GB) and the > cluster is quite big for that high GC. > > > > But what's troubling me is this issue doesn't occur in Spark 2.2 at all. > What could be the reason behind such a behaviour? > > > > Regards, > > Dhrub > > > > On Mon, Jul 29, 2019 at 6:45 PM Sean Owen wrote: > >> > >> -dev@ > >> > >> Yep, high GC activity means '(almost) out of memory'. I don't see that > >> you've checked heap usage - is it nearly full? > >> The answer isn't tuning but more heap. > >> (Sometimes with really big heaps the problem is big pauses, but that's > >> not the case here.) > >> > >> On Mon, Jul 29, 2019 at 1:26 AM Dhrubajyoti Hati > wrote: > >> > > >> > Hi, > >> > > >> > We were running Logistic Regression in Spark 2.2.X and then we tried > to see how does it do in Spark 2.3.X. Now we are facing an issue while > running a Logistic Regression Model in Spark 2.3.X on top of > Yarn(GCP-Dataproc). In the TreeAggregate method it takes a huge time due to > very High GC Activity. I have tuned the GC, created different sized > clusters, higher spark version(2.4.X), smaller data but nothing helps. The > GC time is 100 - 1000 times of the processing time in avg for iterations. > >> > > >> > The strange part is in Spark 2.2 this doesn't happen at all. Same > code, same cluster sizing, same data in both the cases. > >> > > >> > I was wondering if someone can explain this behaviour and help me to > resolve this. How can the same code has so different behaviour in two Spark > version, especially the higher ones? > >> > > >> > Here are the config which I used: > >> > > >> > > >> > spark.serializer=org.apache.spark.serializer.KryoSerializer > >> > > >> > #GC Tuning > >> > > >> > spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy > -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms9000m > -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5 > >> > > >> > > >> > spark.executor.instances=20 > >> > > >> > spark.executor.cores=1 > >> > > >> > spark.executor.memory=9010m > >> > > >> > > >> > > >> > Regards, > >> > Dhrub > >> > >
Re: Logistic Regression Iterations causing High GC in Spark 2.3
Could be lots of things. Implementations change, caching may have changed, etc. The size of the input doesn't really directly translate to heap usage. Here you just need a bit more memory. On Mon, Jul 29, 2019 at 9:03 AM Dhrubajyoti Hati wrote: > > Hi Sean, > > Yeah I checked the heap, its almost full. I checked the GC logs in the > executors where I found that GC cycles are kicking in frequently. The > Executors tab shows red in the "Total Time/GC Time". > > Also the data which I am dealing with is quite small(~4 GB) and the cluster > is quite big for that high GC. > > But what's troubling me is this issue doesn't occur in Spark 2.2 at all. What > could be the reason behind such a behaviour? > > Regards, > Dhrub > > On Mon, Jul 29, 2019 at 6:45 PM Sean Owen wrote: >> >> -dev@ >> >> Yep, high GC activity means '(almost) out of memory'. I don't see that >> you've checked heap usage - is it nearly full? >> The answer isn't tuning but more heap. >> (Sometimes with really big heaps the problem is big pauses, but that's >> not the case here.) >> >> On Mon, Jul 29, 2019 at 1:26 AM Dhrubajyoti Hati >> wrote: >> > >> > Hi, >> > >> > We were running Logistic Regression in Spark 2.2.X and then we tried to >> > see how does it do in Spark 2.3.X. Now we are facing an issue while >> > running a Logistic Regression Model in Spark 2.3.X on top of >> > Yarn(GCP-Dataproc). In the TreeAggregate method it takes a huge time due >> > to very High GC Activity. I have tuned the GC, created different sized >> > clusters, higher spark version(2.4.X), smaller data but nothing helps. The >> > GC time is 100 - 1000 times of the processing time in avg for iterations. >> > >> > The strange part is in Spark 2.2 this doesn't happen at all. Same code, >> > same cluster sizing, same data in both the cases. >> > >> > I was wondering if someone can explain this behaviour and help me to >> > resolve this. How can the same code has so different behaviour in two >> > Spark version, especially the higher ones? >> > >> > Here are the config which I used: >> > >> > >> > spark.serializer=org.apache.spark.serializer.KryoSerializer >> > >> > #GC Tuning >> > >> > spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal >> > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails >> > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy >> > -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms9000m >> > -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5 >> > >> > >> > spark.executor.instances=20 >> > >> > spark.executor.cores=1 >> > >> > spark.executor.memory=9010m >> > >> > >> > >> > Regards, >> > Dhrub >> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Logistic Regression Iterations causing High GC in Spark 2.3
Hi Sean, Yeah I checked the heap, its almost full. I checked the GC logs in the executors where I found that GC cycles are kicking in frequently. The Executors tab shows red in the "Total Time/GC Time". Also the data which I am dealing with is quite small(~4 GB) and the cluster is quite big for that high GC. But what's troubling me is this issue doesn't occur in Spark 2.2 at all. What could be the reason behind such a behaviour? Regards, Dhrub On Mon, Jul 29, 2019 at 6:45 PM Sean Owen wrote: > -dev@ > > Yep, high GC activity means '(almost) out of memory'. I don't see that > you've checked heap usage - is it nearly full? > The answer isn't tuning but more heap. > (Sometimes with really big heaps the problem is big pauses, but that's > not the case here.) > > On Mon, Jul 29, 2019 at 1:26 AM Dhrubajyoti Hati > wrote: > > > > Hi, > > > > We were running Logistic Regression in Spark 2.2.X and then we tried to > see how does it do in Spark 2.3.X. Now we are facing an issue while running > a Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In > the TreeAggregate method it takes a huge time due to very High GC Activity. > I have tuned the GC, created different sized clusters, higher spark > version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 > times of the processing time in avg for iterations. > > > > The strange part is in Spark 2.2 this doesn't happen at all. Same code, > same cluster sizing, same data in both the cases. > > > > I was wondering if someone can explain this behaviour and help me to > resolve this. How can the same code has so different behaviour in two Spark > version, especially the higher ones? > > > > Here are the config which I used: > > > > > > spark.serializer=org.apache.spark.serializer.KryoSerializer > > > > #GC Tuning > > > > spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy > -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms9000m > -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5 > > > > > > spark.executor.instances=20 > > > > spark.executor.cores=1 > > > > spark.executor.memory=9010m > > > > > > > > Regards, > > Dhrub > > >
Re: Logistic Regression Iterations causing High GC in Spark 2.3
-dev@ Yep, high GC activity means '(almost) out of memory'. I don't see that you've checked heap usage - is it nearly full? The answer isn't tuning but more heap. (Sometimes with really big heaps the problem is big pauses, but that's not the case here.) On Mon, Jul 29, 2019 at 1:26 AM Dhrubajyoti Hati wrote: > > Hi, > > We were running Logistic Regression in Spark 2.2.X and then we tried to see > how does it do in Spark 2.3.X. Now we are facing an issue while running a > Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In the > TreeAggregate method it takes a huge time due to very High GC Activity. I > have tuned the GC, created different sized clusters, higher spark > version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 > times of the processing time in avg for iterations. > > The strange part is in Spark 2.2 this doesn't happen at all. Same code, same > cluster sizing, same data in both the cases. > > I was wondering if someone can explain this behaviour and help me to resolve > this. How can the same code has so different behaviour in two Spark version, > especially the higher ones? > > Here are the config which I used: > > > spark.serializer=org.apache.spark.serializer.KryoSerializer > > #GC Tuning > > spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions > -XX:+G1SummarizeConcMark -Xms9000m -XX:ParallelGCThreads=20 > -XX:ConcGCThreads=5 > > > spark.executor.instances=20 > > spark.executor.cores=1 > > spark.executor.memory=9010m > > > > Regards, > Dhrub > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Logistic Regression Iterations causing High GC in Spark 2.3
Actually I didn't have any of the GC tuning in the beginning and then adding them also didn't made any difference. As mentioned earlier I tried low number executors of higher configuration and vice versa. Nothing helps. About the code its simple logistic regression nothing with explicit broadcast or anything. The data is stored as parquet files in GCS bucket. *The question is how does it work fine with Spark 2.2 and has this issue with Spark 2.3 or higher version. As I mentioned before same code, same cluster configuration and size, same data in both the cases.* Regards, Dhrub On Mon, Jul 29, 2019 at 12:37 PM Jörn Franke wrote: > I would remove the all GC tuning and add it later once you found the > underlying root cause. Usually more GC means you need to provide more > memory, because something has changed (your application, spark Version etc.) > > We don’t have your full code to give exact advise, but you may want to > rethink the one code / executor approach and have less executors but more > cores / executor. That sometimes can lead to more heap usage (especially if > you broadcast). Keep in mind that if you use more cores/executor it usually > also requires more memory for the executor, but less executors. Similarly > the executor instances might be too many and they may not have enough heap. > You can also increase the memory of the executor. > > Am 29.07.2019 um 08:22 schrieb Dhrubajyoti Hati : > > Hi, > > We were running Logistic Regression in Spark 2.2.X and then we tried to > see how does it do in Spark 2.3.X. Now we are facing an issue while running > a Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In > the TreeAggregate method it takes a huge time due to very High GC Activity. > I have tuned the GC, created different sized clusters, higher spark > version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 > times of the processing time in avg for iterations. > > The strange part is in *Spark 2.2 this doesn't happen at all*. Same code, > same cluster sizing, same data in both the cases. > > I was wondering if someone can explain this behaviour and help me to > resolve this. How can the same code has so different behaviour in two Spark > version, especially the higher ones? > > Here are the config which I used: > > > spark.serializer=org.apache.spark.serializer.KryoSerializer > > #GC Tuning > > spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy > -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms9000m > -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5 > > > spark.executor.instances=20 > > spark.executor.cores=1 > > spark.executor.memory=9010m > > > Regards, > Dhrub > >
Re: Logistic Regression Iterations causing High GC in Spark 2.3
I would remove the all GC tuning and add it later once you found the underlying root cause. Usually more GC means you need to provide more memory, because something has changed (your application, spark Version etc.) We don’t have your full code to give exact advise, but you may want to rethink the one code / executor approach and have less executors but more cores / executor. That sometimes can lead to more heap usage (especially if you broadcast). Keep in mind that if you use more cores/executor it usually also requires more memory for the executor, but less executors. Similarly the executor instances might be too many and they may not have enough heap. You can also increase the memory of the executor. > Am 29.07.2019 um 08:22 schrieb Dhrubajyoti Hati : > > Hi, > > We were running Logistic Regression in Spark 2.2.X and then we tried to see > how does it do in Spark 2.3.X. Now we are facing an issue while running a > Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In the > TreeAggregate method it takes a huge time due to very High GC Activity. I > have tuned the GC, created different sized clusters, higher spark > version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 > times of the processing time in avg for iterations. > > The strange part is in Spark 2.2 this doesn't happen at all. Same code, same > cluster sizing, same data in both the cases. > > I was wondering if someone can explain this behaviour and help me to resolve > this. How can the same code has so different behaviour in two Spark version, > especially the higher ones? > > Here are the config which I used: > > spark.serializer=org.apache.spark.serializer.KryoSerializer > #GC Tuning > spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions > -XX:+G1SummarizeConcMark -Xms9000m -XX:ParallelGCThreads=20 > -XX:ConcGCThreads=5 > > spark.executor.instances=20 > spark.executor.cores=1 > spark.executor.memory=9010m > > > Regards, > Dhrub >
Logistic Regression Iterations causing High GC in Spark 2.3
Hi, We were running Logistic Regression in Spark 2.2.X and then we tried to see how does it do in Spark 2.3.X. Now we are facing an issue while running a Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In the TreeAggregate method it takes a huge time due to very High GC Activity. I have tuned the GC, created different sized clusters, higher spark version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 times of the processing time in avg for iterations. The strange part is in *Spark 2.2 this doesn't happen at all*. Same code, same cluster sizing, same data in both the cases. I was wondering if someone can explain this behaviour and help me to resolve this. How can the same code has so different behaviour in two Spark version, especially the higher ones? Here are the config which I used: spark.serializer=org.apache.spark.serializer.KryoSerializer #GC Tuning spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms9000m -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5 spark.executor.instances=20 spark.executor.cores=1 spark.executor.memory=9010m Regards, Dhrub