parthchandra commented on code in PR #1483:
URL: https://github.com/apache/datafusion-comet/pull/1483#discussion_r1999846503


##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -188,69 +185,62 @@ class CometSparkSessionExtensions
                 scanExec
             }
 
-          // data source V1
-          case scanExec @ FileSourceScanExec(
-                HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
-                _: Seq[_],
-                requiredSchema,
-                _,
-                _,
-                _,
-                _,
-                _,
-                _)
-              if CometScanExec.isFileFormatSupported(fileFormat)
-                && CometNativeScanExec.isSchemaSupported(requiredSchema)
-                && CometNativeScanExec.isSchemaSupported(partitionSchema)
-                // TODO we only enable full native scan if COMET_EXEC_ENABLED 
is enabled
-                // but this is not really what we want .. we currently insert 
`CometScanExec`
-                // here and then it gets replaced with `CometNativeScanExec` 
in `CometExecRule`
-                // but that only happens if `COMET_EXEC_ENABLED` is enabled
-                && COMET_EXEC_ENABLED.get()
-                && COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_DATAFUSION =>
-            logInfo("Comet extension enabled for v1 full native Scan")
-            CometScanExec(scanExec, session)
+        }
+      }
+    }
 
-          // data source V1
-          case scanExec @ FileSourceScanExec(
-                HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
-                _: Seq[_],
-                requiredSchema,
-                _,
-                _,
-                _,
-                _,
-                _,
-                _)
-              if CometScanExec.isFileFormatSupported(fileFormat)
-                && CometScanExec.isSchemaSupported(requiredSchema)
-                && CometScanExec.isSchemaSupported(partitionSchema) =>
-            logInfo("Comet extension enabled for v1 Scan")
-            CometScanExec(scanExec, session)
+    private def isDynamicPruningFilter(e: Expression): Boolean =
+      e.exists(_.isInstanceOf[PlanExpression[_]])
 
-          // data source v1 not supported case
-          case scanExec @ FileSourceScanExec(
-                HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
-                _: Seq[_],
-                requiredSchema,
-                _,
-                _,
-                _,
-                _,
-                _,
-                _) =>
-            val info1 = createMessage(
-              !CometScanExec.isFileFormatSupported(fileFormat),
-              s"File format $fileFormat is not supported")
-            val info2 = createMessage(
-              !CometScanExec.isSchemaSupported(requiredSchema),
-              s"Schema $requiredSchema is not supported")
-            val info3 = createMessage(
-              !CometScanExec.isSchemaSupported(partitionSchema),
-              s"Partition schema $partitionSchema is not supported")
-            withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(","))
+    private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
+
+      if (COMET_DPP_FALLBACK_ENABLED.get() &&
+        scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
+        withInfo(scanExec, "DPP not supported")
+        return scanExec
+      }
+
+      scanExec.relation match {
+        case r: HadoopFsRelation =>
+          val fallbackReasons = new ListBuffer[String]()
+          if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
+            fallbackReasons += s"Unsupported file format ${r.fileFormat}"
+          }
+
+          val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
+          if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
!COMET_EXEC_ENABLED.get()) {
+            fallbackReasons +=
+              s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} 
disabled"
+          }
+
+          val (schemaSupported, partitionSchemaSupported) = scanImpl match {
+            case CometConf.SCAN_NATIVE_DATAFUSION | SCAN_NATIVE_ICEBERG_COMPAT 
=>
+              (
+                CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema),

Review Comment:
   Yeah, I agree the two code paths should have the same type checking, but 
then there is no way to reconcile the fact that they are two different 
operators. I feel it is simpler to have them distinct. 
   Thanks for making the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to