[ https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Emanuele Sabellico updated SPARK-27218: --------------------------------------- Description: Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in-between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! was: Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! > spark-sql-kafka-0-10 startingOffset=earliest not working as expected with > streaming > ----------------------------------------------------------------------------------- > > Key: SPARK-27218 > URL: https://issues.apache.org/jira/browse/SPARK-27218 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Environment: Windows 10, spark-2.4.0-bin-hadoop2.7 > Reporter: Emanuele Sabellico > Priority: Minor > > Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 > with a code like this: > {noformat} > spark.readStream > .format("kafka") > .option("subscribe", topics) > .option("startingOffsets", "earliest") > .load() > .select(from_avro_with_schema_registry($"value", avroOptions) as > "body"){noformat} > I find that Spark doesn't start from the earliest offset but from the latest. > Or better, initially it gets the earliest offsets but then it does a seek to > end, skipping the messages in-between. > In the logs I find this: > {noformat} > 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: > {"test1":{"0":1740}} > 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, > groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] > Resetting offset for partition test1-0 to offset 15922. > {noformat} > Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ > the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is > a _consumer.seekToEnd(partitions)_ > According to the documentation I was expecting that the streaming started > from the earliest offset in this case. Is there something that I'm getting > wrong or doing wrong? > Thanks in advance! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org