I suspect kafka.common.OffsetOutOfRangeException happening for your consumer if incrementing offset + 1 works. Try grep the server.log for that exception. I am not sure of any known issues with simpleconsumer that might causing this. Also copy this thread to us...@kafka.apache.org . Other users might have experience this issue before.
On Thu, Aug 7, 2014, at 06:54 AM, pankaj ojha wrote: > Hi Harsha, > > Thank you for replying. > There is no error in the kafka logs. The output I had pasted in my > previous > mail is from kafka logs only. > But when I restart the kafka process after incrementing the offset by 1, > then it runs fine. > Is there any known issue with the SimpleConsumer code? > > Thanks, > Pankaj > > > On Thu, Aug 7, 2014 at 7:17 PM, Harsha <ka...@harsha.io> wrote: > > > Hi Pankaj, > > Do you notice any errors in kafka logs when your consumer > > stops reading data. Attaching those logs would be helpful in > > finding the issue. > > -Harsha > > > > On Thu, Aug 7, 2014, at 06:12 AM, pankaj ojha wrote: > > > Hi Team, > > > > > > Can you please provide any information on my above problem? > > > > > > Thanks & Regards, > > > Pankaj Ojha > > > > > > > > > On Wed, Aug 6, 2014 at 8:04 PM, pankaj ojha <pankajojh...@gmail.com> > > > wrote: > > > > > > > Hi Team, > > > > > > > > I have a requirement of reading real time data using kafka and write to > > > > cassandra. > > > > For this I am using SimpleConsumer to read data from Kafka topics and > > > > writing into Cassandra. > > > > I am maintaining offsets of topics in my log files. > > > > The issue is that after few days like 3-4 days my cosumer code does not > > > > read data from kafka topics and produce below log output : > > > > > > > > 20:01:17,068 INFO NposKafkaConsumer:48 - Taking partition from > > > > application properties > > > > 20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT > > = > > > > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), > > SO_SNDBUF = > > > > 64512 (requested -1). > > > > 20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP > > address>:9092 > > > > 20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() > > method > > > > Inside while loop :: Value of max_reads::1 > > > > 20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT > > = > > > > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), > > SO_SNDBUF = > > > > 64512 (requested -1). > > > > 20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping > > > > 20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() > > method > > > > Inside while loop :: Value of max_reads::1 > > > > 20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping > > > > 20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() > > method > > > > Inside while loop :: Value of max_reads::1 > > > > 20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping > > > > > > > > NposKafkaConsumer is my main SimpleCosumer class. > > > > But, when I restart the kafka process by incrementing the offset by one > > > > then again my code starts running fine for next few days. > > > > > > > > Can you please help me how I can solve this and where I am going wrong > > ? > > > > > > > > Thanks & Regards, > > > > Pankaj Ojha > > > > > > > > > >