This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5a9d0664d chore: Remove dead code paths for deprecated native_comet
scan (#3396)
5a9d0664d is described below
commit 5a9d0664dc8c1f8c34dd94675c4200b9be3bc265
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 9 09:44:58 2026 -0700
chore: Remove dead code paths for deprecated native_comet scan (#3396)
---
.../org/apache/comet/rules/CometScanRule.scala | 93 +---------------------
.../rules/EliminateRedundantTransitions.scala | 6 +-
2 files changed, 4 insertions(+), 95 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index ebb521730..acdf2afc0 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -38,7 +38,6 @@ import
org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -48,7 +47,7 @@ import
org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata,
IcebergReflection}
import org.apache.comet.objectstore.NativeConfig
-import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
+import org.apache.comet.parquet.{Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled,
isEncryptionConfigSupported}
import org.apache.comet.serde.operator.CometNativeScan
import org.apache.comet.shims.{CometTypeShim, ShimFileFormat,
ShimSubqueryBroadcast}
@@ -61,8 +60,6 @@ case class CometScanRule(session: SparkSession)
with CometTypeShim
with ShimSubqueryBroadcast {
- import CometScanRule._
-
private lazy val showTransformations =
CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
override def apply(plan: SparkPlan): SparkPlan = {
@@ -176,8 +173,6 @@ case class CometScanRule(session: SparkSession)
nativeDataFusionScan(session, scanExec, r,
hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r,
hadoopConf).getOrElse(scanExec)
- case SCAN_NATIVE_COMET =>
- nativeCometScan(session, scanExec, r,
hadoopConf).getOrElse(scanExec)
}
case _ =>
@@ -231,47 +226,9 @@ case class CometScanRule(session: SparkSession)
Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
}
- private def nativeCometScan(
- session: SparkSession,
- scanExec: FileSourceScanExec,
- r: HadoopFsRelation,
- hadoopConf: Configuration): Option[SparkPlan] = {
- if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
- return None
- }
- Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
- }
-
private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {
scanExec.scan match {
- case scan: ParquetScan if COMET_NATIVE_SCAN_IMPL.get() ==
SCAN_NATIVE_COMET =>
- val fallbackReasons = new ListBuffer[String]()
- val schemaSupported =
- CometBatchScanExec.isSchemaSupported(scan.readDataSchema,
fallbackReasons)
- if (!schemaSupported) {
- fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
- }
-
- val partitionSchemaSupported =
- CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema,
fallbackReasons)
- if (!partitionSchemaSupported) {
- fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is
not supported"
- }
-
- if (scan.pushedAggregate.nonEmpty) {
- fallbackReasons += "Comet does not support pushed aggregate"
- }
-
- if (schemaSupported && partitionSchemaSupported &&
scan.pushedAggregate.isEmpty) {
- val cometScan = CometParquetScan(session,
scanExec.scan.asInstanceOf[ParquetScan])
- CometBatchScanExec(
- scanExec.copy(scan = cometScan),
- runtimeFilters = scanExec.runtimeFilters)
- } else {
- withInfos(scanExec, fallbackReasons.toSet)
- }
-
case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
val schemaSupported =
@@ -691,48 +648,6 @@ case class CometScanRule(session: SparkSession)
}
}
- private def selectScan(
- scanExec: FileSourceScanExec,
- partitionSchema: StructType,
- hadoopConf: Configuration): String = {
-
- val fallbackReasons = new ListBuffer[String]()
-
- // native_iceberg_compat only supports local filesystem and S3
- if (scanExec.relation.inputFiles
- .forall(path => path.startsWith("file://") ||
path.startsWith("s3a://"))) {
-
- val filePath = scanExec.relation.inputFiles.headOption
- if (filePath.exists(_.startsWith("s3a://"))) {
- validateObjectStoreConfig(filePath.get, hadoopConf, fallbackReasons)
- }
- } else {
- fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local
filesystem and S3"
- }
-
- val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
- val schemaSupported =
- typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
- val partitionSchemaSupported =
- typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
-
- val cometExecEnabled = COMET_EXEC_ENABLED.get()
- if (!cometExecEnabled) {
- fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires
${COMET_EXEC_ENABLED.key}=true"
- }
-
- if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
- fallbackReasons.isEmpty) {
- logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
- SCAN_NATIVE_ICEBERG_COMPAT
- } else {
- logInfo(
- s"Auto scan mode falling back to $SCAN_NATIVE_COMET due to " +
- s"${fallbackReasons.mkString(", ")}")
- SCAN_NATIVE_COMET
- }
- }
-
private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])
@@ -775,16 +690,12 @@ case class CometScanTypeChecker(scanImpl: String) extends
DataTypeSupport with C
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
- case ShortType
- if scanImpl != CometConf.SCAN_NATIVE_COMET &&
- CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
+ case ShortType if CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get()
=>
fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8
correctly for $dt. " +
s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false
to allow " +
"native execution if your data does not contain unsigned small
integers. " +
CometConf.COMPAT_GUIDE
false
- case _: StructType | _: ArrayType | _: MapType if scanImpl ==
CometConf.SCAN_NATIVE_COMET =>
- false
case dt if isStringCollationType(dt) =>
// we don't need specific support for collation in scans, but this
// is a convenient place to force the whole query to fall back to
Spark for now
diff --git
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index d1c3b0767..ec3336352 100644
---
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -155,7 +155,6 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
* with such scans because the buffers may be modified after C2R reads them.
*
* This includes:
- * - CometScanExec with native_comet scan implementation (V1 path) - uses
BatchReader
* - CometScanExec with native_iceberg_compat and partition columns - uses
* ConstantColumnReader
* - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses
BatchReader
@@ -167,9 +166,8 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
case _ =>
op.exists {
case scan: CometScanExec =>
- scan.scanImpl == CometConf.SCAN_NATIVE_COMET ||
- (scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
- scan.relation.partitionSchema.nonEmpty)
+ scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
+ scan.relation.partitionSchema.nonEmpty
case scan: CometBatchScanExec =>
scan.scan.isInstanceOf[CometParquetScan]
case _ => false
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]