andygrove commented on code in PR #1483: URL: https://github.com/apache/datafusion-comet/pull/1483#discussion_r1999378679
########## spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala: ########## @@ -188,69 +185,62 @@ class CometSparkSessionExtensions scanExec } - // data source V1 - case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), - _: Seq[_], - requiredSchema, - _, - _, - _, - _, - _, - _) - if CometScanExec.isFileFormatSupported(fileFormat) - && CometNativeScanExec.isSchemaSupported(requiredSchema) - && CometNativeScanExec.isSchemaSupported(partitionSchema) - // TODO we only enable full native scan if COMET_EXEC_ENABLED is enabled - // but this is not really what we want .. we currently insert `CometScanExec` - // here and then it gets replaced with `CometNativeScanExec` in `CometExecRule` - // but that only happens if `COMET_EXEC_ENABLED` is enabled - && COMET_EXEC_ENABLED.get() - && COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION => - logInfo("Comet extension enabled for v1 full native Scan") - CometScanExec(scanExec, session) + } + } + } - // data source V1 - case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), - _: Seq[_], - requiredSchema, - _, - _, - _, - _, - _, - _) - if CometScanExec.isFileFormatSupported(fileFormat) - && CometScanExec.isSchemaSupported(requiredSchema) - && CometScanExec.isSchemaSupported(partitionSchema) => - logInfo("Comet extension enabled for v1 Scan") - CometScanExec(scanExec, session) + private def isDynamicPruningFilter(e: Expression): Boolean = + e.exists(_.isInstanceOf[PlanExpression[_]]) - // data source v1 not supported case - case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), - _: Seq[_], - requiredSchema, - _, - _, - _, - _, - _, - _) => - val info1 = createMessage( - !CometScanExec.isFileFormatSupported(fileFormat), - s"File format $fileFormat is not supported") - val info2 = createMessage( - !CometScanExec.isSchemaSupported(requiredSchema), - s"Schema $requiredSchema is not supported") - val info3 = createMessage( - !CometScanExec.isSchemaSupported(partitionSchema), - s"Partition schema $partitionSchema is not supported") - withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(",")) + private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = { + + if (COMET_DPP_FALLBACK_ENABLED.get() && + scanExec.partitionFilters.exists(isDynamicPruningFilter)) { + withInfo(scanExec, "DPP not supported") + return scanExec + } + + scanExec.relation match { + case r: HadoopFsRelation => + val fallbackReasons = new ListBuffer[String]() + if (!CometScanExec.isFileFormatSupported(r.fileFormat)) { + fallbackReasons += s"Unsupported file format ${r.fileFormat}" + } + + val scanImpl = COMET_NATIVE_SCAN_IMPL.get() + if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) { + fallbackReasons += + s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled" + } + + val (schemaSupported, partitionSchemaSupported) = scanImpl match { + case CometConf.SCAN_NATIVE_DATAFUSION | SCAN_NATIVE_ICEBERG_COMPAT => + ( + CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema), Review Comment: I agree. This is confusing. After making these changes, I can better understand why the code was written the way it was before. I was initially confused that we had separate type checks for `native_datafusion` vs `native_iceberg_compat`. Does it make sense that they should use the same type checking? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org