[
https://issues.apache.org/jira/browse/SPARK-32044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun resolved SPARK-32044.
-----------------------------------
Fix Version/s: 2.4.7
Resolution: Fixed
Issue resolved by pull request 28887
[https://github.com/apache/spark/pull/28887]
> [SS] 2.4 Kafka continuous processing print mislead initial offsets log
> -----------------------------------------------------------------------
>
> Key: SPARK-32044
> URL: https://issues.apache.org/jira/browse/SPARK-32044
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.6
> Reporter: Zhongwei Zhu
> Assignee: Zhongwei Zhu
> Priority: Trivial
> Fix For: 2.4.7
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> When using structured streaming in continuous processing mode, after restart
> spark job, spark job can correctly pick up offsets in checkpoint location
> from last epoch. But it always print out below log:
> 20/06/12 00:58:09 INFO [stream execution thread for [id =
> 34e5b909-f9fe-422a-89c0-081251a68693, runId =
> 0246e19d-aaa1-4a5c-9091-bab1a0578a0a]] kafka010.KafkaContinuousReader:
> Initial offsets:
> \{"kafka_topic":{"8":51618236,"11":51610655,"2":51622889,"5":51637171,"14":51637346,"13":51627784,"4":51606960,"7":51632475,"1":51636129,"10":51632212,"9":51634107,"3":51611013,"12":51626567,"15":51640774,"6":51637823,"0":51629106}}
> This log is misleading as spark didn't use this one as initial offsets. Also,
> it results in unnecessary kafka offset fetch. This is caused by below code in
> KafkaContinuousReader
> {code:java}
> offset = start.orElse {
> val offsets = initialOffsets match {
> case EarliestOffsetRangeLimit =>
> KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
> case LatestOffsetRangeLimit =>
> KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
> case SpecificOffsetRangeLimit(p) =>
> offsetReader.fetchSpecificOffsets(p, reportDataLoss)
> }
> logInfo(s"Initial offsets: $offsets")
> offsets
> }
> {code}
> The code inside orElse block is always executed even when start has value.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]