I am getting this spark not serializable exception when running spark submit in standalone mode. I am trying to use spark streaming which gets its stream from kafka queues.. but it is not able to process the mapping actions on the RDDs from the stream ..the code where the serialization exception occurs as follows. I have a separate class to manage the contexts... which has the respective getteres and setters: public class Contexts { public RedisContext rc=null; public SparkContext sc=null; public Gson serializer = new Gson(); public SparkConf sparkConf = null;//new SparkConf().setAppName("SparkStreamEventProcessingEngine"); public JavaStreamingContext jssc=null;//new JavaStreamingContext(sparkConf, new Duration(2000)); public Producer<String, String> kafkaProducer=null; public Tuple2<String,Object> hostTup=null;
The class with the main process logic of spark streaming is as follows: public final class SparkStreamEventProcessingEngine { public Contexts contexts= new Contexts(); public SparkStreamEventProcessingEngine() { } public static void main(String[] args) { SparkStreamEventProcessingEngine temp=new SparkStreamEventProcessingEngine(); temp.tempfunc(); } private void tempfunc(){ System.out.println(contexts.getJssc().toString() +"\n"+ contexts.getRc().toString()+"\n"+contexts.getSc().toString() +"\n"); createRewardProducer(); Properties props = new Properties(); try { props.load(SparkStreamEventProcessingEngine.class.getResourceAsStream("/application.properties")); } catch (IOException e) { System.out.println("Error loading application.properties file"); return ; } Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(props.getProperty("kafa.inbound.queue"),1); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(contexts.getJssc(), props.getProperty("kafka.zookeeper.quorum"), props.getProperty("kafka.consumer.group"), topicMap); //The exception occurs at this line.. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { // private static final long serialVersionUID = 1L; public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); lines.foreachRDD(new Function<JavaRDD<String>,Void>() { public Void call(JavaRDD<String> rdd) throws Exception { rdd.foreach(new VoidFunction<String>(){ public void call(String stringData) throws Exception { Gson serializer = new Gson(); OfferRedeemed event = serializer.fromJson(stringData, OfferRedeemed.class); System.out.println("Incoming Event:" + event.toString()); processTactic(event,"51367"); processTactic(event,"53740"); } }); return null; } }); contexts.getJssc().start(); contexts.getJssc().awaitTermination(); } private void processTactic(OfferRedeemed event, String tacticId){ System.out.println(contexts.getRc().toString()+"hi4"); TacticDefinition tactic = readTacticDefinition(tacticId); boolean conditionMet = false; if(tactic != null){ System.out.println("Evaluating event of type :" + event.getEventType() + " for Tactic : " + tactic.toString());.... And so on.. for respective functionalities... The exception thrown is as follows: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157) at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43) at com.coupons.stream.processing.SparkStreamEventProcessingEngine.tempfunc(SparkStreamEventProcessingEngine.java:366) at com.coupons.stream.processing.SparkStreamEventProcessingEngine.main(SparkStreamEventProcessingEngine.java:346) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: com.coupons.stream.processing.SparkStreamEventProcessingEngine Serialization stack: - object not serializable (class: com.coupons.stream.processing.SparkStreamEventProcessingEngine, value: com.coupons.stream.processing.SparkStreamEventProcessingEngine@6a48a7f3) - field (class: com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, name: this$0, type: class com.coupons.stream.processing.SparkStreamEventProcessingEngine) - object (class com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, com.coupons.stream.processing.SparkStreamEventProcessingEngine$1@1c6c6f24) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 23 more Any type of help on the topic is appreciated...