Hi Kevin, Was the same “group.id” used before? What may be happening is that on startup of the consumer (not from failure restore), any existing committed offset for the groupId in Kafka’s brokers will be used as the starting point. The “auto.offset.reset” is only respected when no committed offsets can be found. Currently, if Flink’s checkpointing isn’t enabled, FlinkKafkaConsumer09 will periodically commit offsets back to Kafka brokers. So, it could be that you’re actually using those offsets as the actual starting points.
Perhaps you can try using a new groupId and see if the behaviour still exists? Regards, Gordon On July 28, 2016 at 4:15:12 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote: Hi, I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I am using Flink 1.0.3. These are my properties: val properties = new Properties() properties.setProperty("bootstrap.servers", config.urlKafka) properties.setProperty("group.id", COLLECTOR_NAME) properties.setProperty("auto.offset.reset", *"earliest"*) According to the new consumer API of Kafka, this should result in the following: /auto.offset.reset: * smallest : automatically reset the offset to the smallest offset/ (source: https://kafka.apache.org/documentation.html#newconsumerapi) However, it starts from the latest item in my topic. Is this a bug or am I doing something wrong? Regards, Kevin