I am getting this spark not serializable exception when running spark submit in
standalone mode. I am trying to use spark streaming which gets its stream from
kafka queues.. but it is not able to process the mapping actions on the RDDs
from the stream ..the code where the serialization exception occurs as follows.
I have a separate class to manage the contexts... which has the respective
getteres and setters:
public class Contexts {
public RedisContext rc=null;
public SparkContext sc=null;
public Gson serializer = new Gson();
public SparkConf sparkConf = null;//new
SparkConf().setAppName("SparkStreamEventProcessingEngine");
public JavaStreamingContext jssc=null;//new JavaStreamingContext(sparkConf,
new Duration(2000));
public Producer<String, String> kafkaProducer=null;
public Tuple2<String,Object> hostTup=null;
The class with the main process logic of spark streaming is as follows:
public final class SparkStreamEventProcessingEngine {
public Contexts contexts= new Contexts();
public SparkStreamEventProcessingEngine() {
}
public static void main(String[] args) {
SparkStreamEventProcessingEngine temp=new
SparkStreamEventProcessingEngine();
temp.tempfunc();
}
private void tempfunc(){
System.out.println(contexts.getJssc().toString() +"\n"+
contexts.getRc().toString()+"\n"+contexts.getSc().toString() +"\n");
createRewardProducer();
Properties props = new Properties();
try {
props.load(SparkStreamEventProcessingEngine.class.getResourceAsStream("/application.properties"));
} catch (IOException e) {
System.out.println("Error loading application.properties file");
return ;
}
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(props.getProperty("kafa.inbound.queue"),1);
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(contexts.getJssc(),
props.getProperty("kafka.zookeeper.quorum"),
props.getProperty("kafka.consumer.group"), topicMap);
//The exception occurs at this line..
JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {
// private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>,Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreach(new VoidFunction<String>(){
public void call(String stringData) throws Exception
{
Gson serializer = new Gson();
OfferRedeemed event = serializer.fromJson(stringData,
OfferRedeemed.class);
System.out.println("Incoming Event:" +
event.toString());
processTactic(event,"51367");
processTactic(event,"53740");
}
});
return null;
}
});
contexts.getJssc().start();
contexts.getJssc().awaitTermination();
}
private void processTactic(OfferRedeemed event, String tacticId){
System.out.println(contexts.getRc().toString()+"hi4");
TacticDefinition tactic = readTacticDefinition(tacticId);
boolean conditionMet = false;
if(tactic != null){
System.out.println("Evaluating event of type :" +
event.getEventType() + " for Tactic : " + tactic.toString());.... And so on..
for respective functionalities...
The exception thrown is as follows:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
at
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
at
com.coupons.stream.processing.SparkStreamEventProcessingEngine.tempfunc(SparkStreamEventProcessingEngine.java:366)
at
com.coupons.stream.processing.SparkStreamEventProcessingEngine.main(SparkStreamEventProcessingEngine.java:346)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
com.coupons.stream.processing.SparkStreamEventProcessingEngine
Serialization stack:
- object not serializable (class:
com.coupons.stream.processing.SparkStreamEventProcessingEngine, value:
com.coupons.stream.processing.SparkStreamEventProcessingEngine@6a48a7f3)
- field (class:
com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, name: this$0,
type: class com.coupons.stream.processing.SparkStreamEventProcessingEngine)
- object (class
com.coupons.stream.processing.SparkStreamEventProcessingEngine$1,
com.coupons.stream.processing.SparkStreamEventProcessingEngine$1@1c6c6f24)
- field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1,
type: interface org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 23 more
Any type of help on the topic is appreciated...