This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 24e4c1249 feat: Improve fallback reporting for `native_datafusion` 
scan (#2879)
24e4c1249 is described below

commit 24e4c124964211805a28ceea2f428adae1b4b4ad
Author: Andy Grove <[email protected]>
AuthorDate: Thu Dec 11 18:29:17 2025 -0700

    feat: Improve fallback reporting for `native_datafusion` scan (#2879)
---
 .../org/apache/comet/rules/CometScanRule.scala     | 95 ++++++++--------------
 .../comet/serde/operator/CometNativeScan.scala     | 61 +++++++++++++-
 2 files changed, 91 insertions(+), 65 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 86ba1ea15..8d15223d0 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -42,12 +42,13 @@ import org.apache.spark.sql.types._
 
 import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
 import org.apache.comet.CometConf._
-import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, 
withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, 
isCometLoaded, withInfo, withInfos}
 import org.apache.comet.DataTypeSupport.isComplexType
 import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, 
IcebergReflection}
 import org.apache.comet.objectstore.NativeConfig
 import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
 import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, 
isEncryptionConfigSupported}
+import org.apache.comet.serde.operator.CometNativeScan
 import org.apache.comet.shims.CometTypeShim
 
 /**
@@ -132,9 +133,6 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
     }
   }
 
-  private def isDynamicPruningFilter(e: Expression): Boolean =
-    e.exists(_.isInstanceOf[PlanExpression[_]])
-
   private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
 
     if (COMET_DPP_FALLBACK_ENABLED.get() &&
@@ -144,10 +142,8 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
 
     scanExec.relation match {
       case r: HadoopFsRelation =>
-        val fallbackReasons = new ListBuffer[String]()
         if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
-          fallbackReasons += s"Unsupported file format ${r.fileFormat}"
-          return withInfos(scanExec, fallbackReasons.toSet)
+          return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
         }
 
         var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
@@ -160,42 +156,8 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
           scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
         }
 
-        // Native DataFusion doesn't support subqueries/dynamic pruning
-        if (scanImpl == SCAN_NATIVE_DATAFUSION &&
-          scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
-          fallbackReasons += "Native DataFusion scan does not support 
subqueries/dynamic pruning"
-          return withInfos(scanExec, fallbackReasons.toSet)
-        }
-
-        if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
-          fallbackReasons +=
-            s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} 
disabled"
-          return withInfos(scanExec, fallbackReasons.toSet)
-        }
-
-        if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
(SQLConf.get.ignoreCorruptFiles ||
-            scanExec.relation.options
-              .get("ignorecorruptfiles") // Spark sets this to lowercase.
-              .contains("true"))) {
-          fallbackReasons +=
-            "Full native scan disabled because ignoreCorruptFiles enabled"
-          return withInfos(scanExec, fallbackReasons.toSet)
-        }
-
-        if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
(SQLConf.get.ignoreMissingFiles ||
-            scanExec.relation.options
-              .get("ignoremissingfiles") // Spark sets this to lowercase.
-              .contains("true"))) {
-          fallbackReasons +=
-            "Full native scan disabled because ignoreMissingFiles enabled"
-          return withInfos(scanExec, fallbackReasons.toSet)
-        }
-
-        if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
scanExec.bucketedScan) {
-          // https://github.com/apache/datafusion-comet/issues/1719
-          fallbackReasons +=
-            "Full native scan disabled because bucketed scan is not supported"
-          return withInfos(scanExec, fallbackReasons.toSet)
+        if (scanImpl == SCAN_NATIVE_DATAFUSION && 
!CometNativeScan.isSupported(scanExec)) {
+          return scanExec
         }
 
         val possibleDefaultValues = 
getExistenceDefaultValues(scanExec.requiredSchema)
@@ -206,36 +168,27 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
           // Spark already converted these to Java-native types, so we can't 
check SQL types.
           // ArrayBasedMapData, GenericInternalRow, GenericArrayData 
correspond to maps, structs,
           // and arrays respectively.
-          fallbackReasons +=
-            "Full native scan disabled because nested types for default values 
are not supported"
-          return withInfos(scanExec, fallbackReasons.toSet)
+          withInfo(
+            scanExec,
+            "Full native scan disabled because nested types for default values 
are not supported")
         }
 
         if (encryptionEnabled(hadoopConf) && scanImpl != 
CometConf.SCAN_NATIVE_COMET) {
           if (!isEncryptionConfigSupported(hadoopConf)) {
-            return withInfos(scanExec, fallbackReasons.toSet)
+            withInfo(scanExec, s"$scanImpl does not support encryption")
           }
         }
 
-        val typeChecker = CometScanTypeChecker(scanImpl)
-        val schemaSupported =
-          typeChecker.isSchemaSupported(scanExec.requiredSchema, 
fallbackReasons)
-        val partitionSchemaSupported =
-          typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
-
-        if (!schemaSupported) {
-          fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} 
for $scanImpl"
-        }
-        if (!partitionSchemaSupported) {
-          fallbackReasons += s"Unsupported partitioning schema 
${r.partitionSchema} for $scanImpl"
-        }
+        // check that schema is supported
+        checkSchema(scanExec, scanImpl, r)
 
-        if (schemaSupported && partitionSchemaSupported) {
+        if (hasExplainInfo(scanExec)) {
+          // could not accelerate, and plan is already tagged with fallback 
reasons
+          scanExec
+        } else {
           // this is confusing, but we always insert a CometScanExec here, 
which may replaced
           // with a CometNativeExec when CometExecRule runs, depending on the 
scanImpl value.
           CometScanExec(scanExec, session, scanImpl)
-        } else {
-          withInfos(scanExec, fallbackReasons.toSet)
         }
 
       case _ =>
@@ -647,6 +600,24 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
     }
   }
 
+  private def isDynamicPruningFilter(e: Expression): Boolean =
+    e.exists(_.isInstanceOf[PlanExpression[_]])
+
+  def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: 
HadoopFsRelation): Unit = {
+    val fallbackReasons = new ListBuffer[String]()
+    val typeChecker = CometScanTypeChecker(scanImpl)
+    val schemaSupported =
+      typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
+    if (!schemaSupported) {
+      withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for 
$scanImpl")
+    }
+    val partitionSchemaSupported =
+      typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
+    if (!partitionSchemaSupported) {
+      fallbackReasons += s"Unsupported partitioning schema 
${r.partitionSchema} for $scanImpl"
+    }
+    withInfos(scanExec, fallbackReasons.toSet)
+  }
 }
 
 case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with 
CometTypeShim {
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
index aacca40c4..b679d9bf4 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
@@ -23,27 +23,82 @@ import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, 
PlanExpression}
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
 import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, 
CometScanExec}
+import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
PartitionedFile}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, 
DataSourceRDDPartition}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StructField, StructType}
 
 import org.apache.comet.{CometConf, ConfigEntry}
-import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.CometConf.COMET_EXEC_ENABLED
+import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, withInfo}
 import org.apache.comet.objectstore.NativeConfig
 import org.apache.comet.parquet.CometParquetUtils
-import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
+import org.apache.comet.serde.{CometOperatorSerde, Compatible, 
OperatorOuterClass, SupportLevel}
 import org.apache.comet.serde.ExprOuterClass.Expr
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
 
+/**
+ * Validation and serde logic for `native_datafusion` scans.
+ */
 object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
 
+  /** Determine whether the scan is supported and tag the Spark plan with any 
fallback reasons */
+  def isSupported(scanExec: FileSourceScanExec): Boolean = {
+
+    if (hasExplainInfo(scanExec)) {
+      // this node has already been tagged with fallback reasons
+      return false
+    }
+
+    if (!COMET_EXEC_ENABLED.get()) {
+      withInfo(scanExec, s"Full native scan disabled because 
${COMET_EXEC_ENABLED.key} disabled")
+    }
+
+    // Native DataFusion doesn't support subqueries/dynamic pruning
+    if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
+      withInfo(scanExec, "Native DataFusion scan does not support 
subqueries/dynamic pruning")
+    }
+
+    if (SQLConf.get.ignoreCorruptFiles ||
+      scanExec.relation.options
+        .get("ignorecorruptfiles") // Spark sets this to lowercase.
+        .contains("true")) {
+      withInfo(scanExec, "Full native scan disabled because ignoreCorruptFiles 
enabled")
+    }
+
+    if (SQLConf.get.ignoreMissingFiles ||
+      scanExec.relation.options
+        .get("ignoremissingfiles") // Spark sets this to lowercase.
+        .contains("true")) {
+
+      withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles 
enabled")
+    }
+
+    if (scanExec.bucketedScan) {
+      // https://github.com/apache/datafusion-comet/issues/1719
+      withInfo(scanExec, "Full native scan disabled because bucketed scan is 
not supported")
+    }
+
+    // the scan is supported if no fallback reasons were added to the node
+    !hasExplainInfo(scanExec)
+  }
+
+  private def isDynamicPruningFilter(e: Expression): Boolean =
+    e.exists(_.isInstanceOf[PlanExpression[_]])
+
   override def enabledConfig: Option[ConfigEntry[Boolean]] = None
 
+  override def getSupportLevel(operator: CometScanExec): SupportLevel = {
+    // all checks happen in CometScanRule before ScanExec is converted to 
CometScanExec, so
+    // we always report compatible here because this serde object is for the 
converted CometScanExec
+    Compatible()
+  }
+
   override def convert(
       scan: CometScanExec,
       builder: Operator.Builder,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to