[ 
https://issues.apache.org/jira/browse/BAHIR-256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonas Schmöle updated BAHIR-256:
--------------------------------
    Description: 
In Spark Structured Streaming, with Spark 2.4.0, I can only use the *payload* 
and *id* columns of MQTT; using the *topic* or *timestamp* columns throws 
_ClassCastExceptions_ after the 0th batch. The error can be reproduced by using 
the 
[MQTTStreamWordCount.scala|https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala]
 example as follows:

In line 56, replace 
{code:java}
.load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]{code}
with
{code:java}
.load(brokerUrl).selectExpr("CAST(topic AS STRING)").as[String]{code}
or 
{code:java}
 .load(brokerUrl).select("topic").as[String]{code}
 

The full exception is as follows for *topic*:
{code:java}
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.spark.unsafe.types.UTF8String at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 
at org.apache.spark.scheduler.Task.run(Task.scala:123) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
[...]
{code}
A similar exception appears when the *timestamp* column is used: 
{code:java}
ava.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) 
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
[...]{code}
h3. Notes
 * The issue disappears when I use Spark 2.3.4
 * Please also see 
[https://stackoverflow.com/questions/64825099/apache-spark-sql-get-json-object-java-lang-string-cannot-be-cast-to-org-apache-s,]
 which I believe is strongly related.

  was:
In Spark Structured Streaming, with Spark 2.4.0, I can only use the *payload* 
and *id* columns of MQTT; using the *topic* or *timestamp* columns throws 
_ClassCastExceptions_ after the 0th batch. The error can be reproduced by using 
the 
[MQTTStreamWordCount.scala|https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala]
 example as follows:

In line 56, replace 
{code:java}
.load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]{code}
with
{code:java}
.load(brokerUrl).selectExpr("CAST(topic AS STRING)").as[String]{code}
or 
{code:java}
 .load(brokerUrl).select("topic").as[String]{code}
 

The full exception is as follows for *topic*:
{code:java}
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.spark.unsafe.types.UTF8String at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 
at org.apache.spark.scheduler.Task.run(Task.scala:123) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
[...]
{code}
A similar exception appears when the *timestamp* column is used: 
{code:java}
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
java.lang.Longjava.lang.ClassCastException: java.sql.Timestamp cannot be cast 
to java.lang.Long at 
scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
[...]{code}
h3. Notes
 * The issue disappears when I use Spark 2.3.4
 * Please also see 
[https://stackoverflow.com/questions/64825099/apache-spark-sql-get-json-object-java-lang-string-cannot-be-cast-to-org-apache-s,]
 which I believe is strongly related.


> Using MQTT topic or timestamp raises ClassCastException
> -------------------------------------------------------
>
>                 Key: BAHIR-256
>                 URL: https://issues.apache.org/jira/browse/BAHIR-256
>             Project: Bahir
>          Issue Type: Bug
>          Components: Spark Structured Streaming Connectors
>    Affects Versions: Spark-2.4.0
>            Reporter: Jonas Schmöle
>            Priority: Major
>
> In Spark Structured Streaming, with Spark 2.4.0, I can only use the *payload* 
> and *id* columns of MQTT; using the *topic* or *timestamp* columns throws 
> _ClassCastExceptions_ after the 0th batch. The error can be reproduced by 
> using the 
> [MQTTStreamWordCount.scala|https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala]
>  example as follows:
> In line 56, replace 
> {code:java}
> .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]{code}
> with
> {code:java}
> .load(brokerUrl).selectExpr("CAST(topic AS STRING)").as[String]{code}
> or 
> {code:java}
>  .load(brokerUrl).select("topic").as[String]{code}
>  
> The full exception is as follows for *topic*:
> {code:java}
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.spark.unsafe.types.UTF8String at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
>  at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
>  at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at 
> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>  at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
> org.apache.spark.scheduler.Task.run(Task.scala:123) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> [...]
> {code}
> A similar exception appears when the *timestamp* column is used: 
> {code:java}
> ava.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long at 
> scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
>  at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
> [...]{code}
> h3. Notes
>  * The issue disappears when I use Spark 2.3.4
>  * Please also see 
> [https://stackoverflow.com/questions/64825099/apache-spark-sql-get-json-object-java-lang-string-cannot-be-cast-to-org-apache-s,]
>  which I believe is strongly related.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to