How to repartition Spark DStream Kafka ConsumerRecord RDD.  I am getting 
uneven size of Kafka topics.. We want to repartition the input RDD based on 
some logic.
 But when I try to apply the repartition I am getting "object not serializable 
(class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found 
following workaround
 
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
Call rdd.forEachPartition and create the NotSerializable object in there like 
this:rdd.forEachPartition(iter -> {  NotSerializable notSerializable = new 
NotSerializable();
  // ...Now process iter});
APPLIED HERE
 val stream =KafkaUtils.createDirectStream[String, String]( ssc, 
PreferConsistent, Subscribe[String, String](topics, kafkaParam) 
).map(_.value())      stream.foreachRDD { rdd =>        val repartitionRDD = 
flow.repartitionRDD(rdd,1)        println("&&&&&&&&&&&&&& repartitionRDD " + 
repartitionRDD.count())       val modifiedRDD = rdd.mapPartitions {           
iter =>{            val customerRecords: List[ConsumerRecord[String, String]] = 
List[ConsumerRecord[String, String]]()             while(iter.hasNext){         
         val consumerRecord :ConsumerRecord[String, String] = iter.next()       
           customerRecords:+ consumerRecord             }             
customerRecords.iterator          }        }        val r = 
modifiedRDD.repartition(1)        println("************* after repartition " + 
r.count())
BUT still getting same object not Serializable error.   Any help is greatly 
appreciated.        

Reply via email to