[ 
https://issues.apache.org/jira/browse/BAHIR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn updated BAHIR-175:
-----------------------
    Description: 
Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0

 
 # Reading checkpoints offsets error

org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to 
org.apache.spark.sql.execution.streaming.LongOffset

 

solution:

The MqttStreamSource.scala source file: 

Line 149, getBatch Method: 

val startIndex = start.getOrElse(LongOffset(0)) match

{ case offset: SerializedOffset => offset.json.toInt case offset: LongOffset => 
offset.offset.toInt }

val endIndex = end match

{ case offset: SerializedOffset => offset.json.toInt case offset: LongOffset => 
offset.offset.toInt }

2. The MqttStreamSource.scala source file

getBatch Method:

val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
 // Move consumed messages to persistent store.
 (startIndex + 1 to endIndex).foreach

{ id => val element = messages.getOrElse(id, store.retrieve(id)) data += 
element store.store(id, element) messages.remove(id, element) }

The following line:

val element = messages.getOrElse(id, store.retrieve(id)) throws error:

java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
scala.runtime.Nothing$
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
 at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
 at scala.collection.immutable.Range.foreach(Range.scala:160)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)

 

 

 

  was:
1. Reading checkpoints offsets error

org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to 
org.apache.spark.sql.execution.streaming.LongOffset

 

solution:

The MqttStreamSource.scala source file: 

Line 149, getBatch Method: 

val startIndex = start.getOrElse(LongOffset(0)) match {
 case offset: SerializedOffset => offset.json.toInt
 case offset: LongOffset => offset.offset.toInt
}
val endIndex = end match {
 case offset: SerializedOffset => offset.json.toInt
 case offset: LongOffset => offset.offset.toInt
}

2. The MqttStreamSource.scala source file

getBatch Method:

val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
// Move consumed messages to persistent store.
(startIndex + 1 to endIndex).foreach { id =>
 val element = messages.getOrElse(id, store.retrieve(id))
 data += element
 store.store(id, element)
 messages.remove(id, element)
}

The following line:

val element = messages.getOrElse(id, store.retrieve(id)) throws error:

java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
scala.runtime.Nothing$
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
 at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
 at scala.collection.immutable.Range.foreach(Range.scala:160)
 at 
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)

 

 

 


> Recovering from Failures with Checkpointing Exception(Mqtt)
> -----------------------------------------------------------
>
>                 Key: BAHIR-175
>                 URL: https://issues.apache.org/jira/browse/BAHIR-175
>             Project: Bahir
>          Issue Type: Bug
>          Components: Spark Structured Streaming Connectors
>            Reporter: lynn
>            Priority: Major
>
> Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0
>  
>  # Reading checkpoints offsets error
> org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to 
> org.apache.spark.sql.execution.streaming.LongOffset
>  
> solution:
> The MqttStreamSource.scala source file: 
> Line 149, getBatch Method: 
> val startIndex = start.getOrElse(LongOffset(0)) match
> { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset 
> => offset.offset.toInt }
> val endIndex = end match
> { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset 
> => offset.offset.toInt }
> 2. The MqttStreamSource.scala source file
> getBatch Method:
> val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
>  // Move consumed messages to persistent store.
>  (startIndex + 1 to endIndex).foreach
> { id => val element = messages.getOrElse(id, store.retrieve(id)) data += 
> element store.store(id, element) messages.remove(id, element) }
> The following line:
> val element = messages.getOrElse(id, store.retrieve(id)) throws error:
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> scala.runtime.Nothing$
>  at 
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
>  at 
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
>  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>  at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
>  at 
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
>  at 
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
>  at 
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
>  at scala.collection.immutable.Range.foreach(Range.scala:160)
>  at 
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
>  at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to