Hello all,
Can we invoke JavaRDD while processing stream from Kafka for example.
Following code is throwing some serialization exception. Not sure if this is
feasible.
JavaStreamingContext jssc = new JavaStreamingContext(jsc,
Durations.seconds(5));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {
public String call(Tuple2<String, String> tuple2) { return tuple2._2();
}
});
JavaPairDStream<String, String> wordCounts = lines.mapToPair( new
PairFunction<String, String, String>() {
public Tuple2<String, String> call(String urlString) {
String propertiesFile =
"/home/cloudera/Desktop/sample/input/featurelist.properties";
JavaRDD<String> propertiesFileRDD =
jsc.textFile(propertiesFile);
JavaPairRDD<String, String> featureKeyClassPair =
propertiesFileRDD.mapToPair(
new PairFunction<String, String,
String>() {
public Tuple2<String, String>
call(String property) {
return new
Tuple2(property.split("=")[0], property.split("=")[1]);
}
});
featureKeyClassPair.count();
return new Tuple2<String, String>(urlString, featureScore);
}
});