Just bumping the issue I am having, if anyone can provide direction? I
have been stuck on this for a while now.

Thanks,
Conor

On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote:
> Hi,
>
> I have a memory leak in the spark driver which is not in the heap or
> the non-heap.
> Even though neither of these are increasing, the java process RSS
> memory is and eventually takes up all the memory on the machine.
> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>
> I have reduced the leak to the code below.
> If I remove cassandra from the code below the memory leak does not happen.
> Can someone please explain why this is happening or what I can do to
> further investigate it.
> I have also include pics from jconsole for a couple of hours and
> datadog showing the same timeframe the rss memory increase.
>
> Thanks,
> Conor
>
>     val ssc = new StreamingContext(sparkConf,
> Seconds(SparkStreamingBatchInterval))
>
>     ssc.checkpoint(HdfsNameNodeUriPath)
>
>     val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
> MetadataBrokerList)
>
>     var kafkaMessages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams,
> KafkaTopics.split(DELIMITER).toSet)
>
>     var eventBuckets = kafkaMessages.map(keyMessage => {
>         implicit val formats = DefaultFormats.lossless
>         val eventBucket = parse(keyMessage._2)
>         val minute = new Date((eventBucket \ MINUTE).extract[Long])
>         val business = (eventBucket \ BUSINESS).extract[String]
>         val account = (eventBucket \ ACCOUNT).extract[String]
>         (minute, business, account)})
>
>       var eventsToBeProcessed = eventBuckets.transform(rdd =>
>         rdd.joinWithCassandraTable("analytics_events" + '_' +
> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS,
> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS,
> ACCOUNT)).filter(entry => {
>         //remove any entries without a result
>         entry._2.length > 0
>       }))
>
>       eventsToBeProcessed.foreachRDD(rdd => {
>         println(rdd.take(1))
>       })
>
>     sys.ShutdownHookThread {
>       System.err.println(s"Gracefully stopping $JobName Spark
> Streaming Application")
>       ssc.stop(stopSparkContext = true, stopGracefully = true)
>       System.err.println(s"$JobName streaming job stopped")
>     }
>
>     ssc.start()
>     ssc.awaitTermination()

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to