Hi Tai, Should definitely be possible. Would you mind opening a JIRA issue with the description you posted?
Thanks, Max On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzuli...@gmail.com> wrote: > Hi Kevin, > > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest” > for the older Kafka 0.8. > > I’m wondering whether or not it is reasonable to add a Flink-specific way > to set the consumer’s starting position to “earliest” and “latest”, without > respecting the external Kafka offset store. Perhaps we can change the > current behaviour (checking committed offsets in Kafka as starting point) > as a user option, and add new options to read from “earliest” and “latest” > regardless of the groupId and externally committed offsets. I think this > better matches how users usually interpret the functionality of setting > starting positions, while also keeping the “auto.offset.reset” behaviour > that frequent Kafka users are used to. Also, this would also more clearly > define that under the context of Flink, the external Kafka offset store is > used only to expose the consumers progress to the outside world, and not > used to manipulate how topics are read. > > Just an idea I have in mind, not sure if it would be a reasonable add. It’d > be great to hear what other think of this. > > Regards, > Gordon > > > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote: > > Thank you Gordon and Max, > > Thank you Gordon, that explains the behaviour a bit better to me. I am > now adding the timestamp to the group ID and that is a good workaround > for now. The "smallest" option is unfortunately not available in this > version of the FlinkKafkaConsumer class. > > Cheers, > Kevin > > > On 28.07.2016 10:39, Maximilian Michels wrote: >> Hi Kevin, >> >> You need to use properties.setProperty("auto.offset.reset", >> "smallest") for Kafka 9 to start from the smallest offset. Note, that >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", >> "earliest") to achieve the same behavior. >> >> Kafka keeps track of the offsets per group id. If you have already >> read from a topic with a certain group id and want to restart from the >> smallest offset available, you need to generate a unique group id. >> >> Cheers, >> Max >> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch> > wrote: >>> Hi, >>> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 > class. I >>> am using Flink 1.0.3. >>> >>> These are my properties: >>> >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", config.urlKafka) >>> properties.setProperty("group.id", COLLECTOR_NAME) >>> properties.setProperty("auto.offset.reset", *"earliest"*) >>> >>> According to the new consumer API of Kafka, this should result in the >>> following: >>> >>> /auto.offset.reset: * smallest : automatically reset the offset to the >>> smallest offset/ (source: >>> https://kafka.apache.org/documentation.html#newconsumerapi) >>> >>> However, it starts from the latest item in my topic. Is this a bug or am > I >>> doing something wrong? >>> >>> Regards, >>> Kevin >>>