kazuyukitanimura commented on code in PR #1474: URL: https://github.com/apache/datafusion-comet/pull/1474#discussion_r1982416352
########## spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala: ########## @@ -116,143 +113,124 @@ class CometSparkSessionExtensions withInfo(scan, "Metadata column is not supported") scan - case scanExec: FileSourceScanExec - if COMET_DPP_FALLBACK_ENABLED.get() && - scanExec.partitionFilters.exists(isDynamicPruningFilter) => - withInfo(scanExec, "DPP not supported") - scanExec + // data source v1 + case scanExec: FileSourceScanExec => + transformScan(scanExec) // data source V2 - case scanExec: BatchScanExec - if scanExec.scan.isInstanceOf[ParquetScan] && - CometBatchScanExec.isSchemaSupported( - scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) && - CometBatchScanExec.isSchemaSupported( - scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema) && - // Comet does not support pushedAggregate - scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty => - val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan]) - logInfo("Comet extension enabled for Scan") - CometBatchScanExec( - scanExec.copy(scan = cometScan), - runtimeFilters = scanExec.runtimeFilters) - - // If it is a `ParquetScan` but unsupported by Comet, attach the exact - // reason to the plan. - case scanExec: BatchScanExec if scanExec.scan.isInstanceOf[ParquetScan] => - val requiredSchema = scanExec.scan.asInstanceOf[ParquetScan].readDataSchema - val info1 = createMessage( - !CometBatchScanExec.isSchemaSupported(requiredSchema), - s"Schema $requiredSchema is not supported") - val readPartitionSchema = scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema - val info2 = createMessage( - !CometBatchScanExec.isSchemaSupported(readPartitionSchema), - s"Partition schema $readPartitionSchema is not supported") - // Comet does not support pushedAggregate - val info3 = createMessage( - scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined, - "Comet does not support pushed aggregate") - withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet) - scanExec - - // Other datasource V2 scan case scanExec: BatchScanExec => - scanExec.scan match { - // Iceberg scan, supported cases - case s: SupportsComet - if s.isCometEnabled && - CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema()) => - logInfo(s"Comet extension enabled for ${scanExec.scan.getClass.getSimpleName}") - // When reading from Iceberg, we automatically enable type promotion - SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true") - CometBatchScanExec( - scanExec.clone().asInstanceOf[BatchScanExec], - runtimeFilters = scanExec.runtimeFilters) - - // Iceberg scan but disabled or unsupported by Comet - case s: SupportsComet => - val info1 = createMessage( - !s.isCometEnabled, - "Comet extension is not enabled for " + - s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side") - val info2 = createMessage( - !CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema()), - "Comet extension is not enabled for " + - s"${scanExec.scan.getClass.getSimpleName}: Schema not supported") - withInfos(scanExec, Seq(info1, info2).flatten.toSet) - - // If it is data source V2 other than Parquet or Iceberg, - // attach the unsupported reason to the plan. - case _ => - withInfo(scanExec, "Comet Scan only supports Parquet") - scanExec - } + transformBatchScan(scanExec) - // data source V1 - case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), - _: Seq[_], - requiredSchema, - _, - _, - _, - _, - _, - _) - if CometScanExec.isFileFormatSupported(fileFormat) - && CometNativeScanExec.isSchemaSupported(requiredSchema) - && CometNativeScanExec.isSchemaSupported(partitionSchema) - // TODO we only enable full native scan if COMET_EXEC_ENABLED is enabled - // but this is not really what we want .. we currently insert `CometScanExec` - // here and then it gets replaced with `CometNativeScanExec` in `CometExecRule` - // but that only happens if `COMET_EXEC_ENABLED` is enabled - && COMET_EXEC_ENABLED.get() - && COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION => - logInfo("Comet extension enabled for v1 full native Scan") - CometScanExec(scanExec, session) - - // data source V1 - case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), - _: Seq[_], - requiredSchema, - _, - _, - _, - _, - _, - _) - if CometScanExec.isFileFormatSupported(fileFormat) - && CometScanExec.isSchemaSupported(requiredSchema) - && CometScanExec.isSchemaSupported(partitionSchema) => - logInfo("Comet extension enabled for v1 Scan") - CometScanExec(scanExec, session) - - // data source v1 not supported case - case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), - _: Seq[_], - requiredSchema, - _, - _, - _, - _, - _, - _) => - val info1 = createMessage( - !CometScanExec.isFileFormatSupported(fileFormat), - s"File format $fileFormat is not supported") - val info2 = createMessage( - !CometScanExec.isSchemaSupported(requiredSchema), - s"Schema $requiredSchema is not supported") - val info3 = createMessage( - !CometScanExec.isSchemaSupported(partitionSchema), - s"Partition schema $partitionSchema is not supported") - withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(",")) - scanExec } } } + + private def transformScan(scanExec: FileSourceScanExec): SparkPlan = { + + def isDynamicPruningFilter(e: Expression): Boolean = + e.exists(_.isInstanceOf[PlanExpression[_]]) + + if (COMET_DPP_FALLBACK_ENABLED.get() && + scanExec.partitionFilters.exists(isDynamicPruningFilter)) { + withInfo(scanExec, "DPP not supported") + return scanExec + } + + scanExec.relation match { + case HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _) => + if (!CometScanExec.isFileFormatSupported(fileFormat)) { + withInfo(scanExec, s"fileFormat $fileFormat not supported") + return scanExec + } + + COMET_NATIVE_SCAN_IMPL.get() match { + case SCAN_NATIVE_DATAFUSION => + // TODO we only enable full native scan if COMET_EXEC_ENABLED is enabled + // but this is not really what we want .. we currently insert `CometScanExec` + // here and then it gets replaced with `CometNativeScanExec` in `CometExecRule` + // but that only happens if `COMET_EXEC_ENABLED` is enabled + if (!COMET_EXEC_ENABLED.get()) { + withInfo( + scanExec, + s"Native scan not enabled when ${COMET_EXEC_ENABLED.key} is not enabled") + return scanExec + } + if (!CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema)) { + withInfo(scanExec, s"requiredSchema ${scanExec.requiredSchema} not supported") + return scanExec + } + if (!CometNativeScanExec.isSchemaSupported(partitionSchema)) { + withInfo(scanExec, s"partitionSchema $partitionSchema not supported") + return scanExec + } + CometScanExec(scanExec, session) + + case SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT => + if (!CometScanExec.isSchemaSupported(scanExec.requiredSchema)) { + withInfo(scanExec, s"requiredSchema ${scanExec.requiredSchema} not supported") + return scanExec + } + if (!CometScanExec.isSchemaSupported(partitionSchema)) { + withInfo(scanExec, s"partitionSchema $partitionSchema not supported") + return scanExec + } Review Comment: We used to include multiple reasons previously, but this change only return one of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org