Thanks for your reply.
I invoked my program with the broker ip and host and it triggered as expected
but I see the below error
./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
172.28.161.32:9092 TestTopic
15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as
local[n], n > 1 in local mode if you have receivers to get data, otherwise
Spark jobs will not get resources to process the received data.
Exception in thread "main" org.apache.spark.SparkException:
java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([TestTopic,0])
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Whene I ran the below to check the offsets I get this
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic
--group test-consumer-group --zookeeper localhost:2181
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/TestTopic
/0.
Also I just added this below configs to my kafaka/config/consumer.properties
and restarted kafka
auto.offset.reset=smallest
offsets.storage=zookeeper
offsets.channel.backoff.ms=1000
offsets.channel.socket.timeout.ms=10000
offsets.commit.max.retries=5
dual.commit.enabled=true
From: Cody Koeninger [mailto:[email protected]]
Sent: Monday, September 28, 2015 7:56 PM
To: Ratika Prasad <[email protected]>
Cc: [email protected]
Subject: Re: Spark-Kafka Connector issue
This is a user list question not a dev list question.
Looks like your driver is having trouble communicating to the kafka brokers.
Make sure the broker host and port is available from the driver host (using nc
or telnet); make sure that you're providing the _broker_ host and port to
createDirectStream, not the zookeeper host; make sure the topics in question
actually exist on kafka and the names match what you're providing to
createDirectStream.
On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad
<[email protected]<mailto:[email protected]>> wrote:
Hi All,
I am trying out the spark streaming and reading the messages from kafka topics
which later would be created into streams as below…I have the kafka setup on a
vm and topics created however when I try to run the program below from my spark
vm as below I get an error even though the kafka server and zookeeper are up
and running
./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
172.28.161.32:2181<http://172.28.161.32:2181> redemption_inbound
Exception in thread "main" org.apache.spark.SparkException:
java.io.EOFException: Received -1 when reading from channel, socket has likely
been closed.
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Program
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <topics> is a list of one or more kafka topics to consume
from\n\n");
System.exit(1);
}
String brokers = args[0];
String topics = args[1];
// Create context with 2 second batch interval
SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaStreamEventProcessing");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
HashSet<String> topicsSet = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
System.out.println("Word Counts are : " + wordCounts.toString());
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}