don caldwell created KAFKA-3972: ----------------------------------- Summary: kafka java consumer poll returns 0 records after seekToBeginning Key: KAFKA-3972 URL: https://issues.apache.org/jira/browse/KAFKA-3972 Project: Kafka Issue Type: Task Components: consumer Affects Versions: 0.10.0.0 Environment: docker image elasticsearch:latest, kafka scala version 2.11, kafka version 0.10.0.0 Reporter: don caldwell
kafkacat successfully returns rows for the topic, but the following java source reliably fails to produce rows. I have the suspicion that I am missing some simple thing in my setup, but I have been unable to find a way out. I am using the current docker and using docker network commands to connect the processes in my cluster. The properties are: bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092 group.id: dhcp1 topic: dhcp enable.auto.commit: false auto.commit.interval.ms: 1000 session.timeout.ms 30000 key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer the kafka consumer follows. One thing that I find curious is that, although I seem to successfully make the call to seekToBeginning(), when I print offsets on failure, I get large offsets for all partitions although I had expected them to be 0 or at least some small number. Here is the code: import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.Integer; import java.lang.System; import java.lang.Thread; import java.lang.InterruptedException; import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; public class KConsumer { private Properties prop; private String topic; private Integer polln; private KafkaConsumer<String, String> consumer; private String[] pna = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}; public KConsumer(String pf) throws FileNotFoundException, IOException { this.setProperties(pf); this.newClient(); } public void setProperties(String p) throws FileNotFoundException, IOException { this.prop = new Properties(); this.prop.load(new FileInputStream(p)); this.topic = this.prop.getProperty("topic"); this.polln = new Integer(this.prop.getProperty("polln")); } public void setTopic(String t) { this.topic = t; } public String getTopic() { return this.topic; } public void newClient() { System.err.println("creating consumer"); Properties kp = new Properties(); for(String p : pna) { String v = this.prop.getProperty(p); if(v != null) { kp.put(p, v); } } //this.consumer = new KafkaConsumer<>(this.prop); this.consumer = new KafkaConsumer<>(kp); //this.consumer.subscribe(Collections.singletonList(this.topic)); System.err.println("subscribing to " + this.topic); this.consumer.subscribe(Arrays.asList(this.topic)); //this.seekToBeginning(); } public void close() { this.consumer.close(); this.consumer = null; } public void seekToBeginning() { if(this.topic == null) { System.err.println("KConsumer: topic not set"); System.exit(1); } System.err.println("setting partition offset to beginning"); java.util.Set<TopicPartition> tps = this.consumer.assignment(); this.consumer.seekToBeginning(tps); } public ConsumerRecords<String,String> nextBatch() throws KafkaException { while(true) { try { System.err.printf("polling..."); ConsumerRecords<String,String> records = this.consumer.poll(this.polln); System.err.println("returned"); System.err.printf("record count %d\n", records.count()); return records; } catch(SchemaException e) { System.err.println("nextBatch: " + e); } catch(KafkaException e) { System.err.println("nextBatch: " + e); throw e; } catch(Exception e) { System.err.println("nextBatch: " + e); this.consumer.close(); System.exit(1); } try { System.err.println("sleeping"); Thread.sleep(2000); } catch(InterruptedException e) { System.err.println(e); System.exit(0); } } } public void printBatch(ConsumerRecords<String,String> records) { System.err.println("printing..."); Iterable<ConsumerRecord<String,String>> ri = records.records(this.topic); for (ConsumerRecord<String, String> record : ri) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } public void doProcess() { Integer n = 0; Integer f = 0; long total = 0; try { while(true) { ConsumerRecords<String,String> r = this.nextBatch(); long count = r.count(); if(r.count() > 0) { total += count; this.printBatch(r); n = n + 1; } else { f = f + 1; } if(f > 10) { System.err.printf("total %d\n", total); this.printMisc(); break; } } } finally { this.consumer.close(); } } public void printPosition(int pid) { try { TopicPartition tp = new TopicPartition(this.topic, pid); long pos = this.consumer.position(tp); System.err.printf(" offset: %d\n", pos); } catch(IllegalArgumentException e) { System.err.printf("printPosition: %d %s\n", pid, e); } } public void printMisc() { Map<String,List<PartitionInfo>> topicMap; List<PartitionInfo> partitionList; System.err.println("in printMisc"); try { topicMap = this.consumer.listTopics(); for(String key: topicMap.keySet()) { if(key.compareTo(this.topic) != 0) continue; System.err.printf("topic: %s\n", key); List<PartitionInfo> pl = topicMap.get(key); for(PartitionInfo pinf: pl) { System.err.printf("partition %d\n", pinf.partition()); System.err.printf(" leader %s\n", pinf.leader().host()); this.printPosition(pinf.partition()); System.err.printf(" replicas:\n"); for(Node r: pinf.replicas()) { System.err.printf(" %s %s\n", r.id(), r.host()); } System.err.printf(" inSyncReplicas:\n"); for(Node r: pinf.inSyncReplicas()) { System.err.printf(" %s %s\n", r.id(), r.host()); } } } } catch (TimeoutException e) { System.err.printf("printMisc: %s\n", e); //System.exit(1); } } public static void main(String[] args) throws FileNotFoundException, IOException, InterruptedException { if(args.length == 1) { Thread.sleep(2000); // docker network connect KConsumer kc = new KConsumer(args[0]); //kc.printMisc(); kc.doProcess(); } else { System.err.println("Usage KConsumer propfile"); System.exit(1); } } } -- This message was sent by Atlassian JIRA (v6.3.4#6332)