[
https://issues.apache.org/jira/browse/SPARK-31040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Richard Gilmore updated SPARK-31040:
------------------------------------
Description:
Each batch should either log all offsets for each partition or should scan back
across commit logs.
[https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala]
offset log 23615
{code:java}
{"myTopic.myTopic.orders":{"2":27531503,"5":27562423,"4":27528794,"1":27514991,"3":27528899,"0":27504949}}%
{code}
offset log 23616
{code:java}
{"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}%
{code}
{code:java}
/0/03/04 13:49:05 INFO MicroBatchExecution: Resuming at batch 26317 with
committed offsets {KafkaV2[Subscribe[myTopic.myTopic.orders]]:
{"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}} and available offsets
{KafkaV2[Subscribe[myTopic.myTopic.orders]]:
{"myTopic.myTopic.orders":{"2":27531625,"5":27562568,"4":27528990,"1":27515131,"3":27529075,"0":27505141}}}commit
log: {"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}%0/03/04 13:50:24
INFO KafkaMicroBatchReader: Partitions added: Map(myTopic.myTopic.orders-3 ->
26533520, myTopic.myTopic.orders-2 -> 26533730, myTopic.myTopic.orders-4 ->
26533608, myTopic.myTopic.orders-5 -> 26533486)
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-3 starts from 26533520 instead of 0. Some data may have
been missed.
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-2 starts from 26533730 instead of 0. Some data may have
been missed.
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-4 starts from 26533608 instead of 0. Some data may have
been missed.
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-5 starts from 26533486 instead of 0. Some data may have
been missed.
{code}
was:
Each batch should either log all offsets for each partition or should scan back
across commit logs.
[https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala]
offset log 23615
{code:java}
{"myTopic.myTopic.orders":{"2":27531503,"5":27562423,"4":27528794,"1":27514991,"3":27528899,"0":27504949}}%
{code}
offset log 23616
{code:java}
{"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}%Topic
{code}
{code:java}
/0/03/04 13:49:05 INFO MicroBatchExecution: Resuming at batch 26317 with
committed offsets {KafkaV2[Subscribe[myTopic.myTopic.orders]]:
{"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}} and available offsets
{KafkaV2[Subscribe[myTopic.myTopic.orders]]:
{"myTopic.myTopic.orders":{"2":27531625,"5":27562568,"4":27528990,"1":27515131,"3":27529075,"0":27505141}}}commit
log: {"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}%0/03/04 13:50:24
INFO KafkaMicroBatchReader: Partitions added: Map(myTopic.myTopic.orders-3 ->
26533520, myTopic.myTopic.orders-2 -> 26533730, myTopic.myTopic.orders-4 ->
26533608, myTopic.myTopic.orders-5 -> 26533486)
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-3 starts from 26533520 instead of 0. Some data may have
been missed.
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-2 starts from 26533730 instead of 0. Some data may have
been missed.
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-4 starts from 26533608 instead of 0. Some data may have
been missed.
20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
myTopic.myTopic.orders-5 starts from 26533486 instead of 0. Some data may have
been missed.
{code}
> Offsets are only logged for partitions which had data this causes next batch
> to read the partitions that were not included from the beginning when using
> kafka
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-31040
> URL: https://issues.apache.org/jira/browse/SPARK-31040
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0, 2.4.4, 2.4.5
> Reporter: Richard Gilmore
> Priority: Major
>
> Each batch should either log all offsets for each partition or should scan
> back across commit logs.
> [https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala]
> offset log 23615
>
> {code:java}
> {"myTopic.myTopic.orders":{"2":27531503,"5":27562423,"4":27528794,"1":27514991,"3":27528899,"0":27504949}}%
> {code}
>
>
> offset log 23616
>
> {code:java}
> {"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}%
> {code}
>
>
> {code:java}
> /0/03/04 13:49:05 INFO MicroBatchExecution: Resuming at batch 26317 with
> committed offsets {KafkaV2[Subscribe[myTopic.myTopic.orders]]:
> {"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}} and available offsets
> {KafkaV2[Subscribe[myTopic.myTopic.orders]]:
> {"myTopic.myTopic.orders":{"2":27531625,"5":27562568,"4":27528990,"1":27515131,"3":27529075,"0":27505141}}}commit
> log: {"myTopic.myTopic.orders":{"1":27515130,"0":27505140}}%0/03/04 13:50:24
> INFO KafkaMicroBatchReader: Partitions added: Map(myTopic.myTopic.orders-3 ->
> 26533520, myTopic.myTopic.orders-2 -> 26533730, myTopic.myTopic.orders-4 ->
> 26533608, myTopic.myTopic.orders-5 -> 26533486)
> 20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
> myTopic.myTopic.orders-3 starts from 26533520 instead of 0. Some data may
> have been missed.
> 20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
> myTopic.myTopic.orders-2 starts from 26533730 instead of 0. Some data may
> have been missed.
> 20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
> myTopic.myTopic.orders-4 starts from 26533608 instead of 0. Some data may
> have been missed.
> 20/03/04 13:50:24 WARN KafkaMicroBatchReader: Added partition
> myTopic.myTopic.orders-5 starts from 26533486 instead of 0. Some data may
> have been missed.
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]