[ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338465#comment-17338465
 ] 

Tanel Kiis commented on SPARK-35296:
------------------------------------

I finally managed to change the UT in such way, that the assertion error 
happens - the following coalesce was the missing link.
{code}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b3d29df1b2..16ebddd75c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest
     }
     spark.listenerManager.register(listener)
     try {
-      val df = spark.range(100)
+      val df = spark.range(0, 100, 1, 5)
         .observe(
           name = "my_event",
           min($"id").as("min_val"),
@@ -256,6 +256,7 @@ class DataFrameCallbackSuite extends QueryTest
         .observe(
           name = "other_event",
           avg($"id").cast("int").as("avg_val"))
+        .coalesce(2)

       def checkMetrics(metrics: Map[String, Row]): Unit = {
         assert(metrics.size === 2)
{code}

> Dataset.observe fails with an assertion
> ---------------------------------------
>
>                 Key: SPARK-35296
>                 URL: https://issues.apache.org/jira/browse/SPARK-35296
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Tanel Kiis
>            Priority: Major
>         Attachments: 2021-05-03_18-34.png
>
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>       at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>       at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>       at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It happens in a quite complex query and I have not been able to reproduce 
> this with a simpler query
> Added an screenshot of the debugger, at the moment of exception
>  !2021-05-03_18-34.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to