Hi All,
I am trying out the spark streaming and reading the messages from kafka topics
which later would be created into streams as below...I have the kafka setup on
a vm and topics created however when I try to run the program below from my
spark vm as below I get an error even though the kafka server and zookeeper are
up and running
./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
172.28.161.32:2181 redemption_inbound
Exception in thread "main" org.apache.spark.SparkException:
java.io.EOFException: Received -1 when reading from channel, socket has likely
been closed.
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Program
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <topics> is a list of one or more kafka topics to consume
from\n\n");
System.exit(1);
}
String brokers = args[0];
String topics = args[1];
// Create context with 2 second batch interval
SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaStreamEventProcessing");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
HashSet<String> topicsSet = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
System.out.println("Word Counts are : " + wordCounts.toString());
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}