[ https://issues.apache.org/jira/browse/SPARK-37487 ]
Huw deleted comment on SPARK-37487:
-----------------------------
was (Author: JIRAUSER288917):
I think I've seen crashes because of this in production.
I can't reproduce locally, but I believe that Imperative aggregates are having
their `serialiseAggregateBufferInPlace` function called twice, the second time
it's doing an unsafe coerce onto a byte buffer.
{quote}Caused by: java.lang.ClassCastException: class [B cannot be cast to
class
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
([B is in module java.base of loader 'bootstrap';
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
is in unnamed module of loader 'app')
at
org.apache.spark.sql.catalyst.expressions.aggregate.ApproxQuantiles.serialize(ApproxQuantiles.scala:19)
at
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
at
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206)
at
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33){quote}
> CollectMetrics is executed twice if it is followed by a sort
> ------------------------------------------------------------
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Tanel Kiis
> Priority: Major
> Labels: correctness
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
> test("SPARK-37487: get observable metrics with sort by callback") {
> val df = spark.range(100)
> .observe(
> name = "my_event",
> min($"id").as("min_val"),
> max($"id").as("max_val"),
> // Test unresolved alias
> sum($"id"),
> count(when($"id" % 2 === 0, 1)).as("num_even"))
> .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
> .sort($"id".desc)
> validateObservedMetrics(df)
> }
> {code}
> The count and sum aggregate report twice the number of rows:
> {code}
> [info] - SPARK-37487: get observable metrics with sort by callback *** FAILED
> *** (169 milliseconds)
> [info] [0,99,9900,100] did not equal [0,99,4950,50]
> (DataFrameCallbackSuite.scala:342)
> [info] org.scalatest.exceptions.TestFailedException:
> [info] at
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
> [info] at
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
> [info] at
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
> [info] at
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
> [info] at
> org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
> [info] at
> org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
> [info] at
> org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:20)
> {code}
> I could not figure out how this happes. Hopefully the UT can help with
> debugging
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]