The default behavior of the consumer is to consume from the tail of the partition log, did you continue processing data after the consumers have created their streams?
Guozhang On Tue, Oct 29, 2013 at 10:52 AM, Shafaq <s.abdullah...@gmail.com> wrote: > Hi, > I see the following scenario: > > 1. Send messages under some topic X, able to see the log folder in Kafka > Broker with name X-0 (Zeroth partition) and having files xxx.log and > xxx.index under them. So guess this is fine > > 2. THen I fire up the consumer for topic X, it is able to find two streams > (mapping to two partitions I have defined). > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer > .createMessageStreams(topicCountMap); > List<KafkaStream<byte[], byte[]>> streams = null; > Iterator<String> it = topicCountMap.keySet().iterator(); > int threadNumber= 0; > while(it.hasNext()) { > String topic = it.next(); > streams = consumerMap.get(topic); > for (KafkaStream stream : streams) { > System.out.println("threadNo =" + threadNumber + " for > topic = " + topic ); > new ConsumerThreadRunnable(stream, threadNumber, topic)); > threadNumber++; > } > } > > However I don't get any messages in the CounsmerTHreanRunnable here > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > while (it.hasNext() ) { > byte[] nextMessageByteArray = it.next().message(); > } > > If I start the consumer first and then restart the producer thread, sending > the messages for topic X then consumer is able to receive the messages. > > From kafka docs the high-level consumer thread does long polling till the > message is available. > > What is wrong I'm doing? Any idea to get around the problem. > > thanks! > > -- > Kind Regards, > Shafaq > -- -- Guozhang