Hi,
I am trying to run this code for consumer.
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class HelloKafkaConsumer extends Thread {
final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = "test";
ConsumerConnector consumerConnector;
public static void main(String[] argv) throws
UnsupportedEncodingException {
HelloKafkaConsumer helloKafkaConsumer = new HelloKafkaConsumer();
helloKafkaConsumer.start();
}
public HelloKafkaConsumer(){
try{
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
System.out.println("Hi"+properties);
properties.put("group.id","test-group");
System.out.println("Hi1"+properties);
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
System.out.println("hi2"+consumerConfig);
consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
System.out.println("hi3"+consumerConnector);
}catch(Exception e){}
}
@Override
public void run() {
try{
Map<String, Integer> topicCountMap = new HashMap<String,
Integer>(); System.out.println("topic count map "+topicCountMap);
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
System.out.println("------------------------->");
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String(it.next().message()));
}catch(Exception e){ System.out.print("Inside run"+e);
e.printStackTrace(); }
}
private static void printMessages(ByteBufferMessageSet messageSet)
throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet)
{
try{
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}catch(Exception e){}
}
}
}
I get EndOfStreamException in zookeeper
[image: Inline image 2]
Need help.
Producer works fine.
Regards,
Vineet Salian