Copilot commented on code in PR #12292:
URL: https://github.com/apache/gluten/pull/12292#discussion_r3409754814
##########
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -99,6 +110,48 @@ object GlutenDeltaJobStatsTracker extends Logging {
new GlutenDeltaJobStatsFallbackTracker(tracker)
}
+ /**
+ * Returns whether the Delta per-file statistics aggregation can be
offloaded to a Velox
+ * whole-stage transformer. This mirrors the plan that
[[GlutenDeltaTaskStatsTracker]] builds on
+ * the executors: if the aggregation/projection is not supported by Velox it
stays a vanilla
+ * [[ProjectExec]] (i.e. does not collapse into a
[[WholeStageTransformer]]), in which case the
+ * native stats tracker must not be used. Evaluated once on the driver so
the executors never
+ * allocate native resources for a plan that cannot run.
+ */
+ private def canOffloadStats(dataCols: Seq[Attribute], statsColExpr:
Expression): Boolean = {
+ try {
+ val aggregates = statsColExpr.collect {
+ case ae: AggregateExpression if
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+ ae
+ }
+ val statsAttrs =
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
+ val statsResultAttrs =
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
+ val aggOp = SortAggregateExec(
+ None,
+ isStreaming = false,
+ None,
+ Seq.empty,
+ aggregates,
+ statsAttrs,
+ 0,
+ statsResultAttrs,
+ StatisticsInputNode(dataCols))
+ val projOp = ProjectExec(statsResultAttrs, aggOp)
+ val offloads = Seq(OffloadOthers()).map(_.toStrcitRule())
Review Comment:
Potential typo in method name: `toStrcitRule` looks like it should be
`toStrictRule`. If `toStrcitRule` doesn’t exist (or was not intended), this
will not compile. Please verify the intended API name and update accordingly.
##########
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -99,6 +110,48 @@ object GlutenDeltaJobStatsTracker extends Logging {
new GlutenDeltaJobStatsFallbackTracker(tracker)
}
+ /**
+ * Returns whether the Delta per-file statistics aggregation can be
offloaded to a Velox
+ * whole-stage transformer. This mirrors the plan that
[[GlutenDeltaTaskStatsTracker]] builds on
+ * the executors: if the aggregation/projection is not supported by Velox it
stays a vanilla
+ * [[ProjectExec]] (i.e. does not collapse into a
[[WholeStageTransformer]]), in which case the
+ * native stats tracker must not be used. Evaluated once on the driver so
the executors never
+ * allocate native resources for a plan that cannot run.
+ */
+ private def canOffloadStats(dataCols: Seq[Attribute], statsColExpr:
Expression): Boolean = {
+ try {
+ val aggregates = statsColExpr.collect {
+ case ae: AggregateExpression if
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+ ae
+ }
Review Comment:
The offloadability check currently drops any non-DeclarativeAggregate
`AggregateExpression`s. That can make `canOffloadStats` return true even when
the real stats plan contains unsupported aggregates (because they’re excluded
from the synthesized plan). This should be conservative: collect all
`AggregateExpression`s and return `false` if any aggregate function isn’t
supported (including non-declarative ones), and consider returning `false` when
no aggregates are found to avoid false positives.
##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -99,6 +110,48 @@ object GlutenDeltaJobStatsTracker extends Logging {
new GlutenDeltaJobStatsFallbackTracker(tracker)
}
+ /**
+ * Returns whether the Delta per-file statistics aggregation can be
offloaded to a Velox
+ * whole-stage transformer. This mirrors the plan that
[[GlutenDeltaTaskStatsTracker]] builds on
+ * the executors: if the aggregation/projection is not supported by Velox it
stays a vanilla
+ * [[ProjectExec]] (i.e. does not collapse into a
[[WholeStageTransformer]]), in which case the
+ * native stats tracker must not be used. Evaluated once on the driver so
the executors never
+ * allocate native resources for a plan that cannot run.
+ */
+ private def canOffloadStats(dataCols: Seq[Attribute], statsColExpr:
Expression): Boolean = {
+ try {
+ val aggregates = statsColExpr.collect {
+ case ae: AggregateExpression if
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+ ae
+ }
Review Comment:
Same issue as delta40: filtering to only `DeclarativeAggregate` can produce
a synthesized plan that’s missing aggregates present in the real stats
expression, leading to false positives from `canOffloadStats`. Prefer
collecting all `AggregateExpression`s and returning `false` if any aggregate is
not supported/offloadable; also consider returning `false` if no aggregates are
found.
##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -99,6 +110,48 @@ object GlutenDeltaJobStatsTracker extends Logging {
new GlutenDeltaJobStatsFallbackTracker(tracker)
}
+ /**
+ * Returns whether the Delta per-file statistics aggregation can be
offloaded to a Velox
+ * whole-stage transformer. This mirrors the plan that
[[GlutenDeltaTaskStatsTracker]] builds on
+ * the executors: if the aggregation/projection is not supported by Velox it
stays a vanilla
+ * [[ProjectExec]] (i.e. does not collapse into a
[[WholeStageTransformer]]), in which case the
+ * native stats tracker must not be used. Evaluated once on the driver so
the executors never
+ * allocate native resources for a plan that cannot run.
+ */
+ private def canOffloadStats(dataCols: Seq[Attribute], statsColExpr:
Expression): Boolean = {
+ try {
+ val aggregates = statsColExpr.collect {
+ case ae: AggregateExpression if
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+ ae
+ }
+ val statsAttrs =
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
+ val statsResultAttrs =
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
+ val aggOp = SortAggregateExec(
+ None,
+ isStreaming = false,
+ None,
+ Seq.empty,
+ aggregates,
+ statsAttrs,
+ 0,
+ statsResultAttrs,
+ StatisticsInputNode(dataCols))
+ val projOp = ProjectExec(statsResultAttrs, aggOp)
+ val offloads = Seq(OffloadOthers()).map(_.toStrcitRule())
Review Comment:
Same as delta40: `toStrcitRule` looks like a misspelling of `toStrictRule`.
If the misspelled method doesn’t exist, this will break compilation; please
confirm the correct method name and use it consistently.
##########
backends-velox/src-delta/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.test.SharedSparkSession
+
+import java.io.File
+
+/**
+ * Regression test for the Gluten Delta per-file statistics tracker.
+ *
+ * Writing a Delta table whose collected min/max statistics cannot be
offloaded to Velox -- for
+ * example over a TIMESTAMP_NTZ column -- used to crash the write task with a
ClassCastException
+ * (ProjectExec cannot be cast to WholeStageTransformer), because the native
stats tracker assumed
+ * the statistics aggregation always collapses into a WholeStageTransformer.
The tracker must now
+ * fall back to row-based statistics collection instead of crashing.
+ */
+class GlutenDeltaStatsSuite extends QueryTest with SharedSparkSession with
DeltaSQLCommandTest {
+
+ import testImplicits._
+
+ test("TIMESTAMP_NTZ stats fall back instead of crashing the write") {
+ withTempDir {
+ dir =>
+ val path = new File(dir, "ntz-stats").getCanonicalPath
+ // The maxValue statistic for a TIMESTAMP_NTZ near Long.MaxValue
triggers the per-file
+ // statistics aggregation that cannot be offloaded to Velox.
+ val nearMaxMicros = Long.MaxValue - 999L
+ val data = Seq(nearMaxMicros)
+ .toDF("micros")
+ .selectExpr("micros AS id", "CAST(TIMESTAMP_MICROS(micros) AS
TIMESTAMP_NTZ) AS ts")
Review Comment:
This test relies on `TIMESTAMP_MICROS(Long.MaxValue - 999)` which may be out
of Spark’s valid timestamp range (depending on Spark version/config),
potentially failing for reasons unrelated to the intended regression (fallback
vs crash). If the core condition is simply “min/max over TIMESTAMP_NTZ is not
offloadable”, consider using a clearly valid value (e.g., a fixed in-range
timestamp literal) so the test remains stable and focused on the fallback
behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]