This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 24e4c1249 feat: Improve fallback reporting for `native_datafusion`
scan (#2879)
24e4c1249 is described below
commit 24e4c124964211805a28ceea2f428adae1b4b4ad
Author: Andy Grove <[email protected]>
AuthorDate: Thu Dec 11 18:29:17 2025 -0700
feat: Improve fallback reporting for `native_datafusion` scan (#2879)
---
.../org/apache/comet/rules/CometScanRule.scala | 95 ++++++++--------------
.../comet/serde/operator/CometNativeScan.scala | 61 +++++++++++++-
2 files changed, 91 insertions(+), 65 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 86ba1ea15..8d15223d0 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -42,12 +42,13 @@ import org.apache.spark.sql.types._
import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
import org.apache.comet.CometConf._
-import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo,
withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo,
isCometLoaded, withInfo, withInfos}
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata,
IcebergReflection}
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled,
isEncryptionConfigSupported}
+import org.apache.comet.serde.operator.CometNativeScan
import org.apache.comet.shims.CometTypeShim
/**
@@ -132,9 +133,6 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
}
}
- private def isDynamicPruningFilter(e: Expression): Boolean =
- e.exists(_.isInstanceOf[PlanExpression[_]])
-
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
if (COMET_DPP_FALLBACK_ENABLED.get() &&
@@ -144,10 +142,8 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
scanExec.relation match {
case r: HadoopFsRelation =>
- val fallbackReasons = new ListBuffer[String]()
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
- fallbackReasons += s"Unsupported file format ${r.fileFormat}"
- return withInfos(scanExec, fallbackReasons.toSet)
+ return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
}
var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
@@ -160,42 +156,8 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
}
- // Native DataFusion doesn't support subqueries/dynamic pruning
- if (scanImpl == SCAN_NATIVE_DATAFUSION &&
- scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
- fallbackReasons += "Native DataFusion scan does not support
subqueries/dynamic pruning"
- return withInfos(scanExec, fallbackReasons.toSet)
- }
-
- if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
- fallbackReasons +=
- s"Full native scan disabled because ${COMET_EXEC_ENABLED.key}
disabled"
- return withInfos(scanExec, fallbackReasons.toSet)
- }
-
- if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION &&
(SQLConf.get.ignoreCorruptFiles ||
- scanExec.relation.options
- .get("ignorecorruptfiles") // Spark sets this to lowercase.
- .contains("true"))) {
- fallbackReasons +=
- "Full native scan disabled because ignoreCorruptFiles enabled"
- return withInfos(scanExec, fallbackReasons.toSet)
- }
-
- if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION &&
(SQLConf.get.ignoreMissingFiles ||
- scanExec.relation.options
- .get("ignoremissingfiles") // Spark sets this to lowercase.
- .contains("true"))) {
- fallbackReasons +=
- "Full native scan disabled because ignoreMissingFiles enabled"
- return withInfos(scanExec, fallbackReasons.toSet)
- }
-
- if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION &&
scanExec.bucketedScan) {
- // https://github.com/apache/datafusion-comet/issues/1719
- fallbackReasons +=
- "Full native scan disabled because bucketed scan is not supported"
- return withInfos(scanExec, fallbackReasons.toSet)
+ if (scanImpl == SCAN_NATIVE_DATAFUSION &&
!CometNativeScan.isSupported(scanExec)) {
+ return scanExec
}
val possibleDefaultValues =
getExistenceDefaultValues(scanExec.requiredSchema)
@@ -206,36 +168,27 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
// Spark already converted these to Java-native types, so we can't
check SQL types.
// ArrayBasedMapData, GenericInternalRow, GenericArrayData
correspond to maps, structs,
// and arrays respectively.
- fallbackReasons +=
- "Full native scan disabled because nested types for default values
are not supported"
- return withInfos(scanExec, fallbackReasons.toSet)
+ withInfo(
+ scanExec,
+ "Full native scan disabled because nested types for default values
are not supported")
}
if (encryptionEnabled(hadoopConf) && scanImpl !=
CometConf.SCAN_NATIVE_COMET) {
if (!isEncryptionConfigSupported(hadoopConf)) {
- return withInfos(scanExec, fallbackReasons.toSet)
+ withInfo(scanExec, s"$scanImpl does not support encryption")
}
}
- val typeChecker = CometScanTypeChecker(scanImpl)
- val schemaSupported =
- typeChecker.isSchemaSupported(scanExec.requiredSchema,
fallbackReasons)
- val partitionSchemaSupported =
- typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
-
- if (!schemaSupported) {
- fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema}
for $scanImpl"
- }
- if (!partitionSchemaSupported) {
- fallbackReasons += s"Unsupported partitioning schema
${r.partitionSchema} for $scanImpl"
- }
+ // check that schema is supported
+ checkSchema(scanExec, scanImpl, r)
- if (schemaSupported && partitionSchemaSupported) {
+ if (hasExplainInfo(scanExec)) {
+ // could not accelerate, and plan is already tagged with fallback
reasons
+ scanExec
+ } else {
// this is confusing, but we always insert a CometScanExec here,
which may replaced
// with a CometNativeExec when CometExecRule runs, depending on the
scanImpl value.
CometScanExec(scanExec, session, scanImpl)
- } else {
- withInfos(scanExec, fallbackReasons.toSet)
}
case _ =>
@@ -647,6 +600,24 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
}
}
+ private def isDynamicPruningFilter(e: Expression): Boolean =
+ e.exists(_.isInstanceOf[PlanExpression[_]])
+
+ def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r:
HadoopFsRelation): Unit = {
+ val fallbackReasons = new ListBuffer[String]()
+ val typeChecker = CometScanTypeChecker(scanImpl)
+ val schemaSupported =
+ typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
+ if (!schemaSupported) {
+ withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for
$scanImpl")
+ }
+ val partitionSchemaSupported =
+ typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
+ if (!partitionSchemaSupported) {
+ fallbackReasons += s"Unsupported partitioning schema
${r.partitionSchema} for $scanImpl"
+ }
+ withInfos(scanExec, fallbackReasons.toSet)
+ }
}
case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with
CometTypeShim {
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
index aacca40c4..b679d9bf4 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
@@ -23,27 +23,82 @@ import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal,
PlanExpression}
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec,
CometScanExec}
+import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD,
DataSourceRDDPartition}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.comet.{CometConf, ConfigEntry}
-import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.CometConf.COMET_EXEC_ENABLED
+import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, withInfo}
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.CometParquetUtils
-import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
+import org.apache.comet.serde.{CometOperatorSerde, Compatible,
OperatorOuterClass, SupportLevel}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
+/**
+ * Validation and serde logic for `native_datafusion` scans.
+ */
object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
+ /** Determine whether the scan is supported and tag the Spark plan with any
fallback reasons */
+ def isSupported(scanExec: FileSourceScanExec): Boolean = {
+
+ if (hasExplainInfo(scanExec)) {
+ // this node has already been tagged with fallback reasons
+ return false
+ }
+
+ if (!COMET_EXEC_ENABLED.get()) {
+ withInfo(scanExec, s"Full native scan disabled because
${COMET_EXEC_ENABLED.key} disabled")
+ }
+
+ // Native DataFusion doesn't support subqueries/dynamic pruning
+ if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
+ withInfo(scanExec, "Native DataFusion scan does not support
subqueries/dynamic pruning")
+ }
+
+ if (SQLConf.get.ignoreCorruptFiles ||
+ scanExec.relation.options
+ .get("ignorecorruptfiles") // Spark sets this to lowercase.
+ .contains("true")) {
+ withInfo(scanExec, "Full native scan disabled because ignoreCorruptFiles
enabled")
+ }
+
+ if (SQLConf.get.ignoreMissingFiles ||
+ scanExec.relation.options
+ .get("ignoremissingfiles") // Spark sets this to lowercase.
+ .contains("true")) {
+
+ withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles
enabled")
+ }
+
+ if (scanExec.bucketedScan) {
+ // https://github.com/apache/datafusion-comet/issues/1719
+ withInfo(scanExec, "Full native scan disabled because bucketed scan is
not supported")
+ }
+
+ // the scan is supported if no fallback reasons were added to the node
+ !hasExplainInfo(scanExec)
+ }
+
+ private def isDynamicPruningFilter(e: Expression): Boolean =
+ e.exists(_.isInstanceOf[PlanExpression[_]])
+
override def enabledConfig: Option[ConfigEntry[Boolean]] = None
+ override def getSupportLevel(operator: CometScanExec): SupportLevel = {
+ // all checks happen in CometScanRule before ScanExec is converted to
CometScanExec, so
+ // we always report compatible here because this serde object is for the
converted CometScanExec
+ Compatible()
+ }
+
override def convert(
scan: CometScanExec,
builder: Operator.Builder,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]