This issue was fixed in https://issues.apache.org/jira/browse/SPARK-18991.
--Prashant On Tue, Dec 20, 2016 at 6:16 PM, Prashant Sharma <scrapco...@gmail.com> wrote: > Hi Shixiong, > > Thanks for taking a look, I am trying to run and see if making > ContextCleaner run more frequently and/or making it non blocking will help. > > --Prashant > > > On Tue, Dec 20, 2016 at 4:05 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Hey Prashant. Thanks for your codes. I did some investigation and it >> turned out that ContextCleaner is too slow and its "referenceQueue" keeps >> growing. My hunch is cleaning broadcast is very slow since it's a blocking >> call. >> >> On Mon, Dec 19, 2016 at 12:50 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Hey, Prashant. Could you track the GC root of byte arrays in the heap? >>> >>> On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma <scrapco...@gmail.com> >>> wrote: >>> >>>> Furthermore, I ran the same thing with 26 GB as the memory, which would >>>> mean 1.3GB per thread of memory. My jmap >>>> <https://github.com/ScrapCodes/KafkaProducer/blob/master/data/26GB/t11_jmap-histo> >>>> results and jstat >>>> <https://github.com/ScrapCodes/KafkaProducer/blob/master/data/26GB/t11_jstat> >>>> results collected after running the job for more than 11h, again show a >>>> memory constraint. The same gradual slowdown, but a bit more gradual as >>>> memory is considerably more than the previous runs. >>>> >>>> >>>> >>>> >>>> This situation sounds like a memory leak ? As the byte array objects >>>> are more than 13GB, and are not garbage collected. >>>> >>>> --Prashant >>>> >>>> >>>> On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma <scrapco...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Goal of my benchmark is to arrive at end to end latency lower than >>>>> 100ms and sustain them over time, by consuming from a kafka topic and >>>>> writing back to another kafka topic using Spark. Since the job does not do >>>>> aggregation and does a constant time processing on each message, it >>>>> appeared to me as an achievable target. But, then there are some >>>>> surprising >>>>> and interesting pattern to observe. >>>>> >>>>> Basically, it has four components namely, >>>>> 1) kafka >>>>> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with >>>>> each message of about 1KB. >>>>> 3) Spark job subscribed to `test` topic and writes out to another >>>>> topic `output`. >>>>> 4) A Kafka consumer, reading from the `output` topic. >>>>> >>>>> How the latency was measured ? >>>>> >>>>> While sending messages from kafka producer, each message is embedded >>>>> the timestamp at which it is pushed to the kafka `test` topic. Spark >>>>> receives each message and writes them out to `output` topic as is. When >>>>> these messages arrive at Kafka consumer, their embedded time is subtracted >>>>> from the time of arrival at the consumer and a scatter plot of the same is >>>>> attached. >>>>> >>>>> The scatter plots sample only 10 minutes of data received during >>>>> initial one hour and then again 10 minutes of data received after 2 hours >>>>> of run. >>>>> >>>>> >>>>> >>>>> These plots indicate a significant slowdown in latency, in the later >>>>> scatter plot indicate almost all the messages were received with a delay >>>>> larger than 2 seconds. However, first plot show that most messages arrived >>>>> in less than 100ms latency. The two samples were taken with time >>>>> difference >>>>> of 2 hours approx. >>>>> >>>>> After running the test for 24 hours, the jstat >>>>> <https://raw.githubusercontent.com/ScrapCodes/KafkaProducer/master/data/jstat_output.txt> >>>>> and jmap >>>>> <https://raw.githubusercontent.com/ScrapCodes/KafkaProducer/master/data/jmap_output.txt> >>>>> output >>>>> for the jobs indicate possibility of memory constrains. To be more clear, >>>>> job was run with local[20] and memory of 5GB(spark.driver.memory). The job >>>>> is straight forward and located here: https://github.com/ScrapCodes/ >>>>> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod >>>>> es/kafka/SparkSQLKafkaConsumer.scala . >>>>> >>>>> >>>>> What is causing the gradual slowdown? I need help in diagnosing the >>>>> problem. >>>>> >>>>> Thanks, >>>>> >>>>> --Prashant >>>>> >>>>> >>>> >>> >> >