[ 
https://issues.apache.org/jira/browse/SPARK-45094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Francis Roy updated SPARK-45094:
-------------------------------------
    Environment: Tested on Spark 3.2.1 and Spark 3.4.0, using Scala 2.12.17 and 
OpenJDK 11.0.20.1.

> isEmpty on union of RDDs sharing the same trait crash when the first RDD is 
> empty
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-45094
>                 URL: https://issues.apache.org/jira/browse/SPARK-45094
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.1, 3.4.0
>         Environment: Tested on Spark 3.2.1 and Spark 3.4.0, using Scala 
> 2.12.17 and OpenJDK 11.0.20.1.
>            Reporter: Jean-Francis Roy
>            Priority: Minor
>
> Given two RDDs of different types, but sharing a common trait, one can obtain 
> a union of the two RDDs by doing the following:
> {code:java}
> > import org.apache.spark.rdd.RDD
> > trait Foo { val a: Int }
> > case class Bar(a: Int) extends Foo
> > case class Baz(a: Int, b: Int) extends Foo
> > val bars = spark.sparkContext.parallelize(List(Bar(1), 
> > Bar(2))).asInstanceOf[RDD[Foo]]
> > val bazs = spark.sparkContext.parallelize(List(Baz(1, 42), Baz(2, 
> > 42))).asInstanceOf[RDD[Foo]]
> > val union = bars.union(bazs){code}
>  
> When doing so, `count()` and `isEmpty()` are behaving as expected:
> {code:java}
> > union.count()
> 4
> > union.isEmpty()
> false{code}
> However, if the first RDD is empty, `count()` will behave as expected, but 
> `isEmpty()` will throw a `java.lang.ArrayStoreException`:
> {code:java}
> > val bars = 
> > spark.sparkContext.parallelize(List.empty[Bar]).asInstanceOf[RDD[Foo]]
> > val union = bars.union(bazs)
> > union.count()
> 2
> > union.isEmpty()
> BOOM{code}
> Full stack trace:
> {code:java}
> ERROR Executor: Exception in task 4.0 in stage 8.0 (TID 134)
> java.lang.ArrayStoreException: 
> $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Baz
>     at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:74)
>     at scala.Array$.slowcopy(Array.scala:157)
>     at scala.Array$.copy(Array.scala:183)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
>     at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
>     at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
>     at 
> scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
>     at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>     at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
>     at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>     at org.apache.spark.scheduler.Task.run(Task.scala:139)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> 23/09/06 14:55:18 ERROR Executor: Exception in task 14.0 in stage 8.0 (TID 
> 144)
> java.lang.ArrayStoreException: 
> $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Baz
>     at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:74)
>     at scala.Array$.slowcopy(Array.scala:157)
>     at scala.Array$.copy(Array.scala:183)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
>     at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
>     at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
>     at 
> scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
>     at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>     at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
>     at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>     at org.apache.spark.scheduler.Task.run(Task.scala:139)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> 23/09/06 14:55:18 WARN TaskSetManager: Lost task 4.0 in stage 8.0 (TID 134) 
> (192.168.86.57 executor driver): java.lang.ArrayStoreException: 
> $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Baz
>     at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:74)
>     at scala.Array$.slowcopy(Array.scala:157)
>     at scala.Array$.copy(Array.scala:183)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
>     at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
>     at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
>     at 
> scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
>     at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>     at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
>     at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>     at org.apache.spark.scheduler.Task.run(Task.scala:139)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)23/09/06 14:55:18 ERROR 
> TaskSetManager: Task 4 in stage 8.0 failed 1 times; aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in 
> stage 8.0 failed 1 times, most recent failure: Lost task 4.0 in stage 8.0 
> (TID 134) (192.168.86.57 executor driver): java.lang.ArrayStoreException: Baz
>     at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:74)
>     at scala.Array$.slowcopy(Array.scala:157)
>     at scala.Array$.copy(Array.scala:183)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
>     at 
> scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
>     at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
>     at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
>     at 
> scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
>     at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
>     at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>     at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>     at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
>     at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>     at org.apache.spark.scheduler.Task.run(Task.scala:139)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
>   at scala.Option.foreach(Option.scala:407)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
>   at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1462)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
>   at org.apache.spark.rdd.RDD.take(RDD.scala:1435)
>   at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1572)
>   at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
>   at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1572)
>   ... 47 elided
> Caused by: java.lang.ArrayStoreException: Baz
>   at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:74)
>   at scala.Array$.slowcopy(Array.scala:157)
>   at scala.Array$.copy(Array.scala:183)
>   at 
> scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
>   at 
> scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
>   at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
>   at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
>   at scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
>   at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
>   at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
>   at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>   at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
>   at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>   at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>   at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
>   at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>   at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>   at org.apache.spark.scheduler.Task.run(Task.scala:139)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to