Hi Kevin,

Was the same “group.id” used before?
What may be happening is that on startup of the consumer (not from failure
restore), any existing committed offset for the groupId in Kafka’s brokers
will be used as the starting point. The “auto.offset.reset” is only
respected when no committed offsets can be found.
Currently, if Flink’s checkpointing isn’t enabled, FlinkKafkaConsumer09
will periodically commit offsets back to Kafka brokers. So, it could be
that you’re actually using those offsets as the actual starting points.

Perhaps you can try using a new groupId and see if the behaviour still
exists?

Regards,
Gordon

On July 28, 2016 at 4:15:12 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:

Hi,

I am currently facing strange behaviour of the FlinkKafkaConsumer09
class. I am using Flink 1.0.3.

These are my properties:

val properties = new Properties()
properties.setProperty("bootstrap.servers", config.urlKafka)
properties.setProperty("group.id", COLLECTOR_NAME)
properties.setProperty("auto.offset.reset", *"earliest"*)

According to the new consumer API of Kafka, this should result in the
following:

/auto.offset.reset: * smallest : automatically reset the offset to the
smallest offset/ (source:
https://kafka.apache.org/documentation.html#newconsumerapi)

However, it starts from the latest item in my topic. Is this a bug or am
I doing something wrong?

Regards,
Kevin

Reply via email to