[ https://issues.apache.org/jira/browse/BAHIR-256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jonas Schmöle updated BAHIR-256: -------------------------------- Description: In Spark Structured Streaming, with Spark 2.4.0, I can only use the *payload* and *id* columns of MQTT; using the *topic* or *timestamp* columns throws _ClassCastExceptions_ after the 0th batch. The error can be reproduced by using the [MQTTStreamWordCount.scala|https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala] example as follows: In line 56, replace {code:java} .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]{code} with {code:java} .load(brokerUrl).selectExpr("CAST(topic AS STRING)").as[String]{code} or {code:java} .load(brokerUrl).select("topic").as[String]{code} The full exception is as follows for *topic*: {code:java} java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [...] {code} A similar exception appears when the *timestamp* column is used: {code:java} ava.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42) [...]{code} h3. Notes * The issue disappears when I use Spark 2.3.4 * Please also see [https://stackoverflow.com/questions/64825099/apache-spark-sql-get-json-object-java-lang-string-cannot-be-cast-to-org-apache-s,] which I believe is strongly related. was: In Spark Structured Streaming, with Spark 2.4.0, I can only use the *payload* and *id* columns of MQTT; using the *topic* or *timestamp* columns throws _ClassCastExceptions_ after the 0th batch. The error can be reproduced by using the [MQTTStreamWordCount.scala|https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala] example as follows: In line 56, replace {code:java} .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]{code} with {code:java} .load(brokerUrl).selectExpr("CAST(topic AS STRING)").as[String]{code} or {code:java} .load(brokerUrl).select("topic").as[String]{code} The full exception is as follows for *topic*: {code:java} java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [...] {code} A similar exception appears when the *timestamp* column is used: {code:java} java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Longjava.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42) [...]{code} h3. Notes * The issue disappears when I use Spark 2.3.4 * Please also see [https://stackoverflow.com/questions/64825099/apache-spark-sql-get-json-object-java-lang-string-cannot-be-cast-to-org-apache-s,] which I believe is strongly related. > Using MQTT topic or timestamp raises ClassCastException > ------------------------------------------------------- > > Key: BAHIR-256 > URL: https://issues.apache.org/jira/browse/BAHIR-256 > Project: Bahir > Issue Type: Bug > Components: Spark Structured Streaming Connectors > Affects Versions: Spark-2.4.0 > Reporter: Jonas Schmöle > Priority: Major > > In Spark Structured Streaming, with Spark 2.4.0, I can only use the *payload* > and *id* columns of MQTT; using the *topic* or *timestamp* columns throws > _ClassCastExceptions_ after the 0th batch. The error can be reproduced by > using the > [MQTTStreamWordCount.scala|https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala] > example as follows: > In line 56, replace > {code:java} > .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]{code} > with > {code:java} > .load(brokerUrl).selectExpr("CAST(topic AS STRING)").as[String]{code} > or > {code:java} > .load(brokerUrl).select("topic").as[String]{code} > > The full exception is as follows for *topic*: > {code:java} > java.lang.ClassCastException: java.lang.String cannot be cast to > org.apache.spark.unsafe.types.UTF8String at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at > org.apache.spark.scheduler.Task.run(Task.scala:123) at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [...] > {code} > A similar exception appears when the *timestamp* column is used: > {code:java} > ava.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long at > scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42) > [...]{code} > h3. Notes > * The issue disappears when I use Spark 2.3.4 > * Please also see > [https://stackoverflow.com/questions/64825099/apache-spark-sql-get-json-object-java-lang-string-cannot-be-cast-to-org-apache-s,] > which I believe is strongly related. -- This message was sent by Atlassian Jira (v8.3.4#803005)