don caldwell created KAFKA-3972:
-----------------------------------

             Summary: kafka java consumer poll returns 0 records after 
seekToBeginning
                 Key: KAFKA-3972
                 URL: https://issues.apache.org/jira/browse/KAFKA-3972
             Project: Kafka
          Issue Type: Task
          Components: consumer
    Affects Versions: 0.10.0.0
         Environment: docker image elasticsearch:latest, kafka scala version 
2.11, kafka version 0.10.0.0
            Reporter: don caldwell


kafkacat successfully returns rows for the topic, but the following java source 
reliably fails to produce rows. I have the suspicion that I am missing some 
simple thing in my setup, but I have been unable to find a way out. I am using 
the current docker and using docker network commands to connect the processes 
in my cluster. The properties are:
bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092
group.id: dhcp1
topic: dhcp
enable.auto.commit: false
auto.commit.interval.ms: 1000
session.timeout.ms 30000
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

the kafka consumer follows. One thing that I find curious is that, although I 
seem to successfully make the call to seekToBeginning(), when I print offsets 
on failure, I get large offsets for all partitions although I had expected them 
to be 0 or at least some small number.
Here is the code:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.Integer;
import java.lang.System;
import java.lang.Thread;
import java.lang.InterruptedException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;


public class KConsumer {
    private Properties prop;
    private String topic;
    private Integer polln;
    private KafkaConsumer<String, String> consumer;
    private String[] pna = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            ConsumerConfig.GROUP_ID_CONFIG,
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
            ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
            ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG};

    public KConsumer(String pf) throws FileNotFoundException,
                    IOException {
        this.setProperties(pf);
        this.newClient();
    }

    public void setProperties(String p) throws FileNotFoundException,
                    IOException {
            this.prop = new Properties();
            this.prop.load(new FileInputStream(p));
            this.topic = this.prop.getProperty("topic");
            this.polln = new Integer(this.prop.getProperty("polln"));
    }

    public void setTopic(String t) {
        this.topic = t;
    }

    public String getTopic() {
        return this.topic;
    }

    public void newClient() {
        System.err.println("creating consumer");
        Properties kp = new Properties();
        for(String p : pna) {
            String v = this.prop.getProperty(p);
            if(v != null) {
                kp.put(p, v);
            }
        }
        //this.consumer = new KafkaConsumer<>(this.prop);
        this.consumer = new KafkaConsumer<>(kp);
        //this.consumer.subscribe(Collections.singletonList(this.topic));
        System.err.println("subscribing to " + this.topic);
        this.consumer.subscribe(Arrays.asList(this.topic));
        //this.seekToBeginning();
    }

    public void close() {
        this.consumer.close();
        this.consumer = null;
    }

    public void seekToBeginning() {
        if(this.topic == null) {
            System.err.println("KConsumer: topic not set");
            System.exit(1);
        }
        System.err.println("setting partition offset to beginning");
        java.util.Set<TopicPartition> tps = this.consumer.assignment();
        this.consumer.seekToBeginning(tps);
    }

    public ConsumerRecords<String,String> nextBatch()
       throws KafkaException {
        while(true) {
            try {
                System.err.printf("polling...");
                ConsumerRecords<String,String> records =
                    this.consumer.poll(this.polln);
    
            System.err.println("returned");
            System.err.printf("record count %d\n", records.count());

            return records;
            } catch(SchemaException e) {
                System.err.println("nextBatch: " + e);
            } catch(KafkaException e) {
                System.err.println("nextBatch: " + e);
                throw e;
            } catch(Exception e) {
                System.err.println("nextBatch: " + e);
                this.consumer.close();
                System.exit(1);
            }
            try {
                System.err.println("sleeping");
                Thread.sleep(2000);
            } catch(InterruptedException e) {
                System.err.println(e);
                System.exit(0);
            }
        }
    }

    public void printBatch(ConsumerRecords<String,String> records) {
        System.err.println("printing...");
        Iterable<ConsumerRecord<String,String>> ri =
               records.records(this.topic);
        for (ConsumerRecord<String, String> record : ri) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                              record.offset(), record.key(),
                              record.value());
        }
    }

    public void doProcess() {
        Integer n = 0;
        Integer f = 0;
        long total = 0;
        try {
            while(true) {
                ConsumerRecords<String,String> r = this.nextBatch();
                long count = r.count();
                if(r.count() > 0) {
                    total += count;
                    this.printBatch(r);
                    n = n + 1;
                } else {
                    f = f + 1;
                }

                if(f > 10) {
                    System.err.printf("total %d\n", total);
                    this.printMisc();
                    break;
                }
            }
        } finally {
            this.consumer.close();
        }
    }

    public void printPosition(int pid) {
        try {
            TopicPartition tp = new TopicPartition(this.topic, pid);
            long pos = this.consumer.position(tp);
            System.err.printf("    offset: %d\n", pos);
        } catch(IllegalArgumentException e) {
            System.err.printf("printPosition: %d %s\n", pid, e);
        }

    }
    public void printMisc() {
        Map<String,List<PartitionInfo>> topicMap;
        List<PartitionInfo> partitionList;

        System.err.println("in printMisc");
        try {
            topicMap = this.consumer.listTopics();
            for(String key: topicMap.keySet()) {
                if(key.compareTo(this.topic) != 0) continue;
                System.err.printf("topic: %s\n", key);
                List<PartitionInfo> pl = topicMap.get(key);
                for(PartitionInfo pinf: pl) {
                    System.err.printf("partition %d\n", pinf.partition());
                    System.err.printf("    leader %s\n",
                                    pinf.leader().host());
                    this.printPosition(pinf.partition());
                    System.err.printf("    replicas:\n");
                    for(Node r: pinf.replicas()) {
                        System.err.printf("    %s %s\n", r.id(), r.host());
                    }
                    System.err.printf("    inSyncReplicas:\n");
                    for(Node r: pinf.inSyncReplicas()) {
                        System.err.printf("    %s %s\n", r.id(), r.host());
                    }
                }
            }
        } catch (TimeoutException e) {
            System.err.printf("printMisc: %s\n", e);
            //System.exit(1);
        }
    }

    public static void main(String[] args) throws FileNotFoundException,
                    IOException, InterruptedException {
        if(args.length == 1) {
            Thread.sleep(2000); // docker network connect
            KConsumer kc = new KConsumer(args[0]);
            //kc.printMisc();
            kc.doProcess();
        } else {
            System.err.println("Usage KConsumer propfile");
            System.exit(1);
        }
    }
}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to