This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 0028f1e1 fix: Fallback to Spark if scan has meta columns (#997)
0028f1e1 is described below
commit 0028f1e16d0c769129a08843fdf6e6c1484d86f4
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Oct 7 08:59:22 2024 -0700
fix: Fallback to Spark if scan has meta columns (#997)
---
.../org/apache/comet/CometSparkSessionExtensions.scala | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index bf09d641..0185a15e 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -25,11 +25,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
-import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral,
EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan,
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual,
NamedExpression, PlanExpression, Remainder}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide,
DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan,
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual,
NamedExpression, PlanExpression, Remainder}
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.comet.util.Utils
@@ -101,7 +102,19 @@ class CometSparkSessionExtensions
def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])
+ def hasMetadataCol(plan: SparkPlan): Boolean = {
+ plan.expressions.exists(_.exists {
+ case a: Attribute =>
+ a.isMetadataCol
+ case _ => false
+ })
+ }
+
plan.transform {
+ case scan if hasMetadataCol(scan) =>
+ withInfo(scan, "Metadata column is not supported")
+ scan
+
case scanExec: FileSourceScanExec
if COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]