[ https://issues.apache.org/jira/browse/BAHIR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luciano Resende resolved BAHIR-175. ----------------------------------- Resolution: Fixed Assignee: Lukasz Antoniak Fix Version/s: Spark-2.4.0 > Recovering from Failures with Checkpointing Exception(Mqtt) > ----------------------------------------------------------- > > Key: BAHIR-175 > URL: https://issues.apache.org/jira/browse/BAHIR-175 > Project: Bahir > Issue Type: Bug > Components: Spark Structured Streaming Connectors > Reporter: lynn > Assignee: Lukasz Antoniak > Priority: Major > Fix For: Spark-2.4.0 > > > Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0 > > # Reading checkpoints offsets error > org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to > org.apache.spark.sql.execution.streaming.LongOffset > > solution: > The MqttStreamSource.scala source file: > Line 149, getBatch Method: > val startIndex = start.getOrElse(LongOffset(0)) match > { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset > => offset.offset.toInt } > val endIndex = end match > { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset > => offset.offset.toInt } > 2. The MqttStreamSource.scala source file > getBatch Method: > val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty > // Move consumed messages to persistent store. > (startIndex + 1 to endIndex).foreach > { id => val element = messages.getOrElse(id, store.retrieve(id)) data += > element store.store(id, element) messages.remove(id, element) } > The following line: > val element = messages.getOrElse(id, store.retrieve(id)) throws error: > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > scala.runtime.Nothing$ > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206) > > solution: > val element: (String, Timestamp) = messages.getOrElse(id, > store.retrieve[(String, Timestamp)](id)) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)