This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a9d0664d chore: Remove dead code paths for deprecated native_comet 
scan (#3396)
5a9d0664d is described below

commit 5a9d0664dc8c1f8c34dd94675c4200b9be3bc265
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 9 09:44:58 2026 -0700

    chore: Remove dead code paths for deprecated native_comet scan (#3396)
---
 .../org/apache/comet/rules/CometScanRule.scala     | 93 +---------------------
 .../rules/EliminateRedundantTransitions.scala      |  6 +-
 2 files changed, 4 insertions(+), 95 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index ebb521730..acdf2afc0 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -38,7 +38,6 @@ import 
org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -48,7 +47,7 @@ import 
org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi
 import org.apache.comet.DataTypeSupport.isComplexType
 import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, 
IcebergReflection}
 import org.apache.comet.objectstore.NativeConfig
-import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
+import org.apache.comet.parquet.{Native, SupportsComet}
 import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, 
isEncryptionConfigSupported}
 import org.apache.comet.serde.operator.CometNativeScan
 import org.apache.comet.shims.{CometTypeShim, ShimFileFormat, 
ShimSubqueryBroadcast}
@@ -61,8 +60,6 @@ case class CometScanRule(session: SparkSession)
     with CometTypeShim
     with ShimSubqueryBroadcast {
 
-  import CometScanRule._
-
   private lazy val showTransformations = 
CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
 
   override def apply(plan: SparkPlan): SparkPlan = {
@@ -176,8 +173,6 @@ case class CometScanRule(session: SparkSession)
             nativeDataFusionScan(session, scanExec, r, 
hadoopConf).getOrElse(scanExec)
           case SCAN_NATIVE_ICEBERG_COMPAT =>
             nativeIcebergCompatScan(session, scanExec, r, 
hadoopConf).getOrElse(scanExec)
-          case SCAN_NATIVE_COMET =>
-            nativeCometScan(session, scanExec, r, 
hadoopConf).getOrElse(scanExec)
         }
 
       case _ =>
@@ -231,47 +226,9 @@ case class CometScanRule(session: SparkSession)
     Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
   }
 
-  private def nativeCometScan(
-      session: SparkSession,
-      scanExec: FileSourceScanExec,
-      r: HadoopFsRelation,
-      hadoopConf: Configuration): Option[SparkPlan] = {
-    if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
-      return None
-    }
-    Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
-  }
-
   private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {
 
     scanExec.scan match {
-      case scan: ParquetScan if COMET_NATIVE_SCAN_IMPL.get() == 
SCAN_NATIVE_COMET =>
-        val fallbackReasons = new ListBuffer[String]()
-        val schemaSupported =
-          CometBatchScanExec.isSchemaSupported(scan.readDataSchema, 
fallbackReasons)
-        if (!schemaSupported) {
-          fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
-        }
-
-        val partitionSchemaSupported =
-          CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, 
fallbackReasons)
-        if (!partitionSchemaSupported) {
-          fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is 
not supported"
-        }
-
-        if (scan.pushedAggregate.nonEmpty) {
-          fallbackReasons += "Comet does not support pushed aggregate"
-        }
-
-        if (schemaSupported && partitionSchemaSupported && 
scan.pushedAggregate.isEmpty) {
-          val cometScan = CometParquetScan(session, 
scanExec.scan.asInstanceOf[ParquetScan])
-          CometBatchScanExec(
-            scanExec.copy(scan = cometScan),
-            runtimeFilters = scanExec.runtimeFilters)
-        } else {
-          withInfos(scanExec, fallbackReasons.toSet)
-        }
-
       case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() =>
         val fallbackReasons = new ListBuffer[String]()
         val schemaSupported =
@@ -691,48 +648,6 @@ case class CometScanRule(session: SparkSession)
     }
   }
 
-  private def selectScan(
-      scanExec: FileSourceScanExec,
-      partitionSchema: StructType,
-      hadoopConf: Configuration): String = {
-
-    val fallbackReasons = new ListBuffer[String]()
-
-    // native_iceberg_compat only supports local filesystem and S3
-    if (scanExec.relation.inputFiles
-        .forall(path => path.startsWith("file://") || 
path.startsWith("s3a://"))) {
-
-      val filePath = scanExec.relation.inputFiles.headOption
-      if (filePath.exists(_.startsWith("s3a://"))) {
-        validateObjectStoreConfig(filePath.get, hadoopConf, fallbackReasons)
-      }
-    } else {
-      fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local 
filesystem and S3"
-    }
-
-    val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
-    val schemaSupported =
-      typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
-    val partitionSchemaSupported =
-      typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
-
-    val cometExecEnabled = COMET_EXEC_ENABLED.get()
-    if (!cometExecEnabled) {
-      fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires 
${COMET_EXEC_ENABLED.key}=true"
-    }
-
-    if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
-      fallbackReasons.isEmpty) {
-      logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
-      SCAN_NATIVE_ICEBERG_COMPAT
-    } else {
-      logInfo(
-        s"Auto scan mode falling back to $SCAN_NATIVE_COMET due to " +
-          s"${fallbackReasons.mkString(", ")}")
-      SCAN_NATIVE_COMET
-    }
-  }
-
   private def isDynamicPruningFilter(e: Expression): Boolean =
     e.exists(_.isInstanceOf[PlanExpression[_]])
 
@@ -775,16 +690,12 @@ case class CometScanTypeChecker(scanImpl: String) extends 
DataTypeSupport with C
       name: String,
       fallbackReasons: ListBuffer[String]): Boolean = {
     dt match {
-      case ShortType
-          if scanImpl != CometConf.SCAN_NATIVE_COMET &&
-            CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
+      case ShortType if CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() 
=>
         fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 
correctly for $dt. " +
           s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false 
to allow " +
           "native execution if your data does not contain unsigned small 
integers. " +
           CometConf.COMPAT_GUIDE
         false
-      case _: StructType | _: ArrayType | _: MapType if scanImpl == 
CometConf.SCAN_NATIVE_COMET =>
-        false
       case dt if isStringCollationType(dt) =>
         // we don't need specific support for collation in scans, but this
         // is a convenient place to force the whole query to fall back to 
Spark for now
diff --git 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index d1c3b0767..ec3336352 100644
--- 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++ 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -155,7 +155,6 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
    * with such scans because the buffers may be modified after C2R reads them.
    *
    * This includes:
-   *   - CometScanExec with native_comet scan implementation (V1 path) - uses 
BatchReader
    *   - CometScanExec with native_iceberg_compat and partition columns - uses
    *     ConstantColumnReader
    *   - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses 
BatchReader
@@ -167,9 +166,8 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
       case _ =>
         op.exists {
           case scan: CometScanExec =>
-            scan.scanImpl == CometConf.SCAN_NATIVE_COMET ||
-            (scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
-              scan.relation.partitionSchema.nonEmpty)
+            scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
+            scan.relation.partitionSchema.nonEmpty
           case scan: CometBatchScanExec => 
scan.scan.isInstanceOf[CometParquetScan]
           case _ => false
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to