[ https://issues.apache.org/jira/browse/BAHIR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lynn updated BAHIR-175: ----------------------- Description: Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0 # Reading checkpoints offsets error org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset solution: The MqttStreamSource.scala source file: Line 149, getBatch Method: val startIndex = start.getOrElse(LongOffset(0)) match { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset => offset.offset.toInt } val endIndex = end match { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset => offset.offset.toInt } 2. The MqttStreamSource.scala source file getBatch Method: val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty // Move consumed messages to persistent store. (startIndex + 1 to endIndex).foreach { id => val element = messages.getOrElse(id, store.retrieve(id)) data += element store.store(id, element) messages.remove(id, element) } The following line: val element = messages.getOrElse(id, store.retrieve(id)) throws error: java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.runtime.Nothing$ at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206) was: 1. Reading checkpoints offsets error org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset solution: The MqttStreamSource.scala source file: Line 149, getBatch Method: val startIndex = start.getOrElse(LongOffset(0)) match { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset => offset.offset.toInt } val endIndex = end match { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset => offset.offset.toInt } 2. The MqttStreamSource.scala source file getBatch Method: val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty // Move consumed messages to persistent store. (startIndex + 1 to endIndex).foreach { id => val element = messages.getOrElse(id, store.retrieve(id)) data += element store.store(id, element) messages.remove(id, element) } The following line: val element = messages.getOrElse(id, store.retrieve(id)) throws error: java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.runtime.Nothing$ at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206) > Recovering from Failures with Checkpointing Exception(Mqtt) > ----------------------------------------------------------- > > Key: BAHIR-175 > URL: https://issues.apache.org/jira/browse/BAHIR-175 > Project: Bahir > Issue Type: Bug > Components: Spark Structured Streaming Connectors > Reporter: lynn > Priority: Major > > Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0 > > # Reading checkpoints offsets error > org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to > org.apache.spark.sql.execution.streaming.LongOffset > > solution: > The MqttStreamSource.scala source file: > Line 149, getBatch Method: > val startIndex = start.getOrElse(LongOffset(0)) match > { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset > => offset.offset.toInt } > val endIndex = end match > { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset > => offset.offset.toInt } > 2. The MqttStreamSource.scala source file > getBatch Method: > val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty > // Move consumed messages to persistent store. > (startIndex + 1 to endIndex).foreach > { id => val element = messages.getOrElse(id, store.retrieve(id)) data += > element store.store(id, element) messages.remove(id, element) } > The following line: > val element = messages.getOrElse(id, store.retrieve(id)) throws error: > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > scala.runtime.Nothing$ > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206) > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)