[
https://issues.apache.org/jira/browse/KAFKA-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ewen Cheslack-Postava resolved KAFKA-3972.
------------------------------------------
Resolution: Invalid
Assignee: Ewen Cheslack-Postava
> kafka java consumer poll returns 0 records after seekToBeginning
> ----------------------------------------------------------------
>
> Key: KAFKA-3972
> URL: https://issues.apache.org/jira/browse/KAFKA-3972
> Project: Kafka
> Issue Type: Task
> Components: consumer
> Affects Versions: 0.10.0.0
> Environment: docker image elasticsearch:latest, kafka scala version
> 2.11, kafka version 0.10.0.0
> Reporter: don caldwell
> Assignee: Ewen Cheslack-Postava
> Labels: kafka, polling
>
> kafkacat successfully returns rows for the topic, but the following java
> source reliably fails to produce rows. I have the suspicion that I am missing
> some simple thing in my setup, but I have been unable to find a way out. I am
> using the current docker and using docker network commands to connect the
> processes in my cluster. The properties are:
> bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092
> group.id: dhcp1
> topic: dhcp
> enable.auto.commit: false
> auto.commit.interval.ms: 1000
> session.timeout.ms 30000
> key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
> value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
> the kafka consumer follows. One thing that I find curious is that, although I
> seem to successfully make the call to seekToBeginning(), when I print offsets
> on failure, I get large offsets for all partitions although I had expected
> them to be 0 or at least some small number.
> Here is the code:
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.errors.TimeoutException;
> import org.apache.kafka.common.protocol.types.SchemaException;
> import org.apache.kafka.common.KafkaException;
> import org.apache.kafka.common.Node;
> import org.apache.kafka.common.PartitionInfo;
> import org.apache.kafka.common.TopicPartition;
> import java.io.FileInputStream;
> import java.io.FileNotFoundException;
> import java.io.IOException;
> import java.lang.Integer;
> import java.lang.System;
> import java.lang.Thread;
> import java.lang.InterruptedException;
> import java.util.Arrays;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> public class KConsumer {
> private Properties prop;
> private String topic;
> private Integer polln;
> private KafkaConsumer<String, String> consumer;
> private String[] pna = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> ConsumerConfig.GROUP_ID_CONFIG,
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
> ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG};
> public KConsumer(String pf) throws FileNotFoundException,
> IOException {
> this.setProperties(pf);
> this.newClient();
> }
> public void setProperties(String p) throws FileNotFoundException,
> IOException {
> this.prop = new Properties();
> this.prop.load(new FileInputStream(p));
> this.topic = this.prop.getProperty("topic");
> this.polln = new Integer(this.prop.getProperty("polln"));
> }
> public void setTopic(String t) {
> this.topic = t;
> }
> public String getTopic() {
> return this.topic;
> }
> public void newClient() {
> System.err.println("creating consumer");
> Properties kp = new Properties();
> for(String p : pna) {
> String v = this.prop.getProperty(p);
> if(v != null) {
> kp.put(p, v);
> }
> }
> //this.consumer = new KafkaConsumer<>(this.prop);
> this.consumer = new KafkaConsumer<>(kp);
> //this.consumer.subscribe(Collections.singletonList(this.topic));
> System.err.println("subscribing to " + this.topic);
> this.consumer.subscribe(Arrays.asList(this.topic));
> //this.seekToBeginning();
> }
> public void close() {
> this.consumer.close();
> this.consumer = null;
> }
> public void seekToBeginning() {
> if(this.topic == null) {
> System.err.println("KConsumer: topic not set");
> System.exit(1);
> }
> System.err.println("setting partition offset to beginning");
> java.util.Set<TopicPartition> tps = this.consumer.assignment();
> this.consumer.seekToBeginning(tps);
> }
> public ConsumerRecords<String,String> nextBatch()
> throws KafkaException {
> while(true) {
> try {
> System.err.printf("polling...");
> ConsumerRecords<String,String> records =
> this.consumer.poll(this.polln);
>
> System.err.println("returned");
> System.err.printf("record count %d\n", records.count());
> return records;
> } catch(SchemaException e) {
> System.err.println("nextBatch: " + e);
> } catch(KafkaException e) {
> System.err.println("nextBatch: " + e);
> throw e;
> } catch(Exception e) {
> System.err.println("nextBatch: " + e);
> this.consumer.close();
> System.exit(1);
> }
> try {
> System.err.println("sleeping");
> Thread.sleep(2000);
> } catch(InterruptedException e) {
> System.err.println(e);
> System.exit(0);
> }
> }
> }
> public void printBatch(ConsumerRecords<String,String> records) {
> System.err.println("printing...");
> Iterable<ConsumerRecord<String,String>> ri =
> records.records(this.topic);
> for (ConsumerRecord<String, String> record : ri) {
> System.out.printf("offset = %d, key = %s, value = %s%n",
> record.offset(), record.key(),
> record.value());
> }
> }
> public void doProcess() {
> Integer n = 0;
> Integer f = 0;
> long total = 0;
> try {
> while(true) {
> ConsumerRecords<String,String> r = this.nextBatch();
> long count = r.count();
> if(r.count() > 0) {
> total += count;
> this.printBatch(r);
> n = n + 1;
> } else {
> f = f + 1;
> }
> if(f > 10) {
> System.err.printf("total %d\n", total);
> this.printMisc();
> break;
> }
> }
> } finally {
> this.consumer.close();
> }
> }
> public void printPosition(int pid) {
> try {
> TopicPartition tp = new TopicPartition(this.topic, pid);
> long pos = this.consumer.position(tp);
> System.err.printf(" offset: %d\n", pos);
> } catch(IllegalArgumentException e) {
> System.err.printf("printPosition: %d %s\n", pid, e);
> }
> }
> public void printMisc() {
> Map<String,List<PartitionInfo>> topicMap;
> List<PartitionInfo> partitionList;
> System.err.println("in printMisc");
> try {
> topicMap = this.consumer.listTopics();
> for(String key: topicMap.keySet()) {
> if(key.compareTo(this.topic) != 0) continue;
> System.err.printf("topic: %s\n", key);
> List<PartitionInfo> pl = topicMap.get(key);
> for(PartitionInfo pinf: pl) {
> System.err.printf("partition %d\n", pinf.partition());
> System.err.printf(" leader %s\n",
> pinf.leader().host());
> this.printPosition(pinf.partition());
> System.err.printf(" replicas:\n");
> for(Node r: pinf.replicas()) {
> System.err.printf(" %s %s\n", r.id(), r.host());
> }
> System.err.printf(" inSyncReplicas:\n");
> for(Node r: pinf.inSyncReplicas()) {
> System.err.printf(" %s %s\n", r.id(), r.host());
> }
> }
> }
> } catch (TimeoutException e) {
> System.err.printf("printMisc: %s\n", e);
> //System.exit(1);
> }
> }
> public static void main(String[] args) throws FileNotFoundException,
> IOException, InterruptedException {
> if(args.length == 1) {
> Thread.sleep(2000); // docker network connect
> KConsumer kc = new KConsumer(args[0]);
> //kc.printMisc();
> kc.doProcess();
> } else {
> System.err.println("Usage KConsumer propfile");
> System.exit(1);
> }
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)