eejbyfeldt commented on code in PR #832: URL: https://github.com/apache/datafusion-comet/pull/832#discussion_r1719828749
########## spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala: ########## @@ -1131,12 +1133,39 @@ object CometSparkSessionExtensions extends Logging { // operators can have a chance to be converted to columnar. Leaf operators that output // columnar batches, such as Spark's vectorized readers, will also be converted to native // comet batches. - // TODO: consider converting other intermediate operators to columnar. - op.isInstanceOf[LeafExecNode] && CometSparkToColumnarExec.isSchemaSupported(op.schema) && - COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { + if (CometSparkToColumnarExec.isSchemaSupported(op.schema)) { + op match { + // Convert Spark DS v1 scan to Arrow format + case scan: FileSourceScanExec => + scan.relation.fileFormat match { + case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) + case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) + case _ => isSparkToArrowEnabled(conf, op) + } + // Convert Spark DS v2 scan to Arrow format + case scan: BatchScanExec => + scan.scan match { + case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) + case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) + case _ => isSparkToArrowEnabled(conf, op) + } Review Comment: Is it intended the new options that precedence over the old operator list in ` COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST`? If someone already enabled all data source scans using that this change will disable it for parquet and json. Maybe it would be surprising if the logic was ```scala private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = { // Only consider converting leaf nodes to columnar currently, so that all the following // operators can have a chance to be converted to columnar. Leaf operators that output // columnar batches, such as Spark's vectorized readers, will also be converted to native // comet batches. CometSparkToColumnarExec.isSchemaSupported(op.schema) && ( isSparkToArrowEnabledOp(conf, op) || isSparkToArrowEnabledDataSource(conf, op)) } private def isSparkToArrowEnabledDataSource(conf: SQLConf, op: SparkPlan): Boolean = { op match { // Convert Spark DS v1 scan to Arrow format case scan: FileSourceScanExec => scan.relation.fileFormat match { case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) case _ => false } // Convert Spark DS v2 scan to Arrow format case scan: BatchScanExec => scan.scan match { case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) case _ => false } } } private def isSparkToArrowEnabledOp(conf: SQLConf, op: SparkPlan) = { op.isInstanceOf[LeafExecNode] && COMET_SPARK_TO_ARROW_ENABLED.get(conf) && { val simpleClassName = Utils.getSimpleName(op.getClass) val nodeName = simpleClassName.replaceAll("Exec$", "") COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) } } ``` So that we do not change the behavior of existing configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org