This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/comet-parquet-exec by this 
push:
     new 3d70b2e07 fix: Simplify native scan config (#1225)
3d70b2e07 is described below

commit 3d70b2e07ec9cd946f72c9ac0445799bece5a39b
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Jan 6 12:27:02 2025 -0800

    fix: Simplify native scan config (#1225)
---
 .../main/scala/org/apache/comet/CometConf.scala    | 29 +++++++++-------------
 .../apache/comet/CometSparkSessionExtensions.scala |  4 +--
 .../scala/org/apache/comet/DataTypeSupport.scala   |  7 ++++--
 .../comet/parquet/CometParquetFileFormat.scala     |  5 ++--
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  2 +-
 .../org/apache/comet/CometExpressionSuite.scala    |  4 +--
 .../org/apache/comet/exec/CometExecSuite.scala     |  4 +--
 .../scala/org/apache/spark/sql/CometTestBase.scala |  3 +--
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |  8 +++---
 9 files changed, 30 insertions(+), 36 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 3a56f64a1..978cf521a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -77,25 +77,20 @@ object CometConf extends ShimCometConf {
     .booleanConf
     .createWithDefault(false)
 
-  val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
-    "spark.comet.native.scan.enabled")
-    .internal()
+  val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = 
conf("spark.comet.scan.impl")
     .doc(
-      "Whether to enable the fully native scan. When this is turned on, Spark 
will use Comet to " +
-        "read supported data sources (currently only Parquet is supported 
natively)." +
-        " By default, this config is true.")
-    .booleanConf
-    .createWithDefault(true)
-
-  val COMET_NATIVE_RECORDBATCH_READER_ENABLED: ConfigEntry[Boolean] = conf(
-    "spark.comet.native.arrow.scan.enabled")
+      "The implementation of Comet Native Scan to use. Available modes are 
'native'," +
+        "'native_full', and 'native_recordbatch'. " +
+        "'native' is for the original Comet native scan which uses a jvm based 
parquet file " +
+        "reader and native column decoding. Supports simple types only " +
+        "'native_full' is a fully native implementation of scan based on 
DataFusion" +
+        "'native_recordbatch' is a native implementation that exposes apis to 
read parquet " +
+        "columns natively.")
     .internal()
-    .doc(
-      "Whether to enable the fully native datafusion based column reader. When 
this is turned on," +
-        " Spark will use Comet to read Parquet files natively via the 
Datafusion based Parquet" +
-        " reader. By default, this config is false.")
-    .booleanConf
-    .createWithDefault(false)
+    .stringConf
+    .transform(_.toLowerCase(Locale.ROOT))
+    .checkValues(Set("native", "native_full", "native_recordbatch"))
+    .createWithDefault("native_full")
 
   val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.parquet.read.parallel.io.enabled")
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 522da0f58..92ce2fea3 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -201,7 +201,7 @@ class CometSparkSessionExtensions
                 _)
               if CometNativeScanExec.isSchemaSupported(requiredSchema)
                 && CometNativeScanExec.isSchemaSupported(partitionSchema)
-                && COMET_FULL_NATIVE_SCAN_ENABLED.get =>
+                && COMET_NATIVE_SCAN_IMPL.get.equals("native_full") =>
             logInfo("Comet extension enabled for v1 full native Scan")
             CometScanExec(scanExec, session)
 
@@ -371,7 +371,7 @@ class CometSparkSessionExtensions
 
       plan.transformUp {
         // Fully native scan for V1
-        case scan: CometScanExec if COMET_FULL_NATIVE_SCAN_ENABLED.get =>
+        case scan: CometScanExec if 
COMET_NATIVE_SCAN_IMPL.get.equals("native_full") =>
           val nativeOp = QueryPlanSerde.operator2Proto(scan).get
           CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
 
diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala 
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index e4235495f..711fee30a 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -40,8 +40,11 @@ trait DataTypeSupport {
       true
     case t: DataType if t.typeName == "timestamp_ntz" => true
     case _: StructType
-        if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED
-          .get() || CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.get() =>
+        if CometConf.COMET_NATIVE_SCAN_IMPL
+          .get()
+          .equals("native_full") || CometConf.COMET_NATIVE_SCAN_IMPL
+          .get()
+          .equals("native_recordbatch") =>
       true
     case _ => false
   }
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index c142abb5c..198a288df 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -100,7 +100,8 @@ class CometParquetFileFormat extends ParquetFileFormat with 
MetricsSupport with
 
     // Comet specific configurations
     val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
-    val nativeArrowReaderEnabled = 
CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.get(sqlConf)
+    val nativeRecordBatchReaderEnabled =
+      
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals("native_recordbatch")
 
     (file: PartitionedFile) => {
       val sharedConf = broadcastedHadoopConf.value.value
@@ -136,7 +137,7 @@ class CometParquetFileFormat extends ParquetFileFormat with 
MetricsSupport with
       pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
 
       val recordBatchReader =
-        if (nativeArrowReaderEnabled) {
+        if (nativeRecordBatchReaderEnabled) {
           val batchReader = new NativeBatchReader(
             sharedConf,
             file,
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a92ffa668..1bce92a71 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2516,7 +2516,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
     op match {
 
       // Fully native scan for V1
-      case scan: CometScanExec if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.get 
=>
+      case scan: CometScanExec if 
CometConf.COMET_NATIVE_SCAN_IMPL.get.equals("native_full") =>
         val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
         nativeScanBuilder.setSource(op.simpleStringWithNodeId())
 
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index f9e2c44c6..3bcca69f4 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2339,7 +2339,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
         withSQLConf(
           SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
           CometConf.COMET_ENABLED.key -> "true",
-          CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
+          CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_full",
           CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
 
           val df = spark.read.parquet(dir.toString())
@@ -2372,7 +2372,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
         withSQLConf(
           SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
           CometConf.COMET_ENABLED.key -> "true",
-          CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
+          CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_full",
           CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
 
           val df = spark.read.parquet(dir.toString())
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 1d1af7b3e..2df3c74ae 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -554,9 +554,7 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("Comet native metrics: scan") {
-    withSQLConf(
-      CometConf.COMET_EXEC_ENABLED.key -> "true",
-      CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "false") {
+    withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "native-scan.parquet")
         makeParquetFileAllTypes(path, dictionaryEnabled = true, 10000)
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 39af52e90..9652880cd 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -80,8 +80,7 @@ abstract class CometTestBase
     conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
     conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
     conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
-    conf.set(CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key, "false")
-    conf.set(CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.key, "true")
+    conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, "native_full")
     conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
     
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, 
"true")
     conf
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index c3513e59e..90a74e9fb 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -93,7 +93,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
     var expectedSimplified = FileUtils.readFileToString(simplifiedFile, 
StandardCharsets.UTF_8)
     val explainFile = new File(dir, "explain.txt")
     var expectedExplain = FileUtils.readFileToString(explainFile, 
StandardCharsets.UTF_8)
-    if (!CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.get()) {
+    if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals("native_full")) {
       expectedExplain = expectedExplain.replace("CometNativeScan", "CometScan")
       expectedSimplified = expectedSimplified.replace("CometNativeScan", 
"CometScan")
     }
@@ -265,8 +265,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
     withSQLConf(
       CometConf.COMET_ENABLED.key -> "true",
       CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
-      CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "false",
-      CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.key -> "true",
+      CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_full",
       CometConf.COMET_EXEC_ENABLED.key -> "true",
       CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
       CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
@@ -299,8 +298,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
     conf.set(CometConf.COMET_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
     conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
-    conf.set(CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key, "false")
-    conf.set(CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.key, "true")
+    conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, "native_full")
     conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
     conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to