[
https://issues.apache.org/jira/browse/SPARK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213336#comment-17213336
]
L. C. Hsieh commented on SPARK-29625:
-------------------------------------
How did you specify the starting offset?
> Spark Structure Streaming Kafka Wrong Reset Offset twice
> --------------------------------------------------------
>
> Key: SPARK-29625
> URL: https://issues.apache.org/jira/browse/SPARK-29625
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.1
> Reporter: Sandish Kumar HN
> Priority: Major
>
> Spark Structure Streaming Kafka Reset Offset twice, once with right offsets
> and second time with very old offsets
> {code}
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO Fetcher: [Consumer clientId=consumer-1,
> groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0]
> Resetting offset for partition topic-151 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO Fetcher: [Consumer clientId=consumer-1,
> groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0]
> Resetting offset for partition topic-118 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO Fetcher: [Consumer clientId=consumer-1,
> groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0]
> Resetting offset for partition topic-85 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO Fetcher: [Consumer clientId=consumer-1,
> groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0]
> Resetting offset for partition topic-52 to offset 122677634.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO Fetcher: [Consumer clientId=consumer-1,
> groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0]
> Resetting offset for partition topic-19 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO Fetcher: [Consumer clientId=consumer-1,
> groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0]
> Resetting offset for partition topic-52 to offset 120504922.*
> [2019-10-28 19:27:40,153] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> INFO ContextCleaner: Cleaned accumulator 810
> {code}
> which is causing a Data loss issue.
> {code}
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 19/10/28 19:27:40
> ERROR StreamExecution: Query [id = d62ca9e4-6650-454f-8691-a3d576d1e4ba,
> runId = 3946389f-222b-495c-9ab2-832c0422cbbb] terminated with error
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -
> java.lang.IllegalStateException: Partition topic-52's offset was changed from
> 122677598 to 120504922, some data may have been missed.
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - Some data may have
> been lost because they are not available in Kafka any more; either the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - data was aged out
> by Kafka or the topic may have been deleted before all the data in the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - topic was
> processed. If you don't want your streaming query to fail on such cases, set
> the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - source option
> "failOnDataLoss" to "false".
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:610)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]