This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/comet-parquet-exec by this
push:
new 3d70b2e07 fix: Simplify native scan config (#1225)
3d70b2e07 is described below
commit 3d70b2e07ec9cd946f72c9ac0445799bece5a39b
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Jan 6 12:27:02 2025 -0800
fix: Simplify native scan config (#1225)
---
.../main/scala/org/apache/comet/CometConf.scala | 29 +++++++++-------------
.../apache/comet/CometSparkSessionExtensions.scala | 4 +--
.../scala/org/apache/comet/DataTypeSupport.scala | 7 ++++--
.../comet/parquet/CometParquetFileFormat.scala | 5 ++--
.../org/apache/comet/serde/QueryPlanSerde.scala | 2 +-
.../org/apache/comet/CometExpressionSuite.scala | 4 +--
.../org/apache/comet/exec/CometExecSuite.scala | 4 +--
.../scala/org/apache/spark/sql/CometTestBase.scala | 3 +--
.../spark/sql/comet/CometPlanStabilitySuite.scala | 8 +++---
9 files changed, 30 insertions(+), 36 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 3a56f64a1..978cf521a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -77,25 +77,20 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
- val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
- "spark.comet.native.scan.enabled")
- .internal()
+ val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] =
conf("spark.comet.scan.impl")
.doc(
- "Whether to enable the fully native scan. When this is turned on, Spark
will use Comet to " +
- "read supported data sources (currently only Parquet is supported
natively)." +
- " By default, this config is true.")
- .booleanConf
- .createWithDefault(true)
-
- val COMET_NATIVE_RECORDBATCH_READER_ENABLED: ConfigEntry[Boolean] = conf(
- "spark.comet.native.arrow.scan.enabled")
+ "The implementation of Comet Native Scan to use. Available modes are
'native'," +
+ "'native_full', and 'native_recordbatch'. " +
+ "'native' is for the original Comet native scan which uses a jvm based
parquet file " +
+ "reader and native column decoding. Supports simple types only " +
+ "'native_full' is a fully native implementation of scan based on
DataFusion" +
+ "'native_recordbatch' is a native implementation that exposes apis to
read parquet " +
+ "columns natively.")
.internal()
- .doc(
- "Whether to enable the fully native datafusion based column reader. When
this is turned on," +
- " Spark will use Comet to read Parquet files natively via the
Datafusion based Parquet" +
- " reader. By default, this config is false.")
- .booleanConf
- .createWithDefault(false)
+ .stringConf
+ .transform(_.toLowerCase(Locale.ROOT))
+ .checkValues(Set("native", "native_full", "native_recordbatch"))
+ .createWithDefault("native_full")
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 522da0f58..92ce2fea3 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -201,7 +201,7 @@ class CometSparkSessionExtensions
_)
if CometNativeScanExec.isSchemaSupported(requiredSchema)
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
- && COMET_FULL_NATIVE_SCAN_ENABLED.get =>
+ && COMET_NATIVE_SCAN_IMPL.get.equals("native_full") =>
logInfo("Comet extension enabled for v1 full native Scan")
CometScanExec(scanExec, session)
@@ -371,7 +371,7 @@ class CometSparkSessionExtensions
plan.transformUp {
// Fully native scan for V1
- case scan: CometScanExec if COMET_FULL_NATIVE_SCAN_ENABLED.get =>
+ case scan: CometScanExec if
COMET_NATIVE_SCAN_IMPL.get.equals("native_full") =>
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index e4235495f..711fee30a 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -40,8 +40,11 @@ trait DataTypeSupport {
true
case t: DataType if t.typeName == "timestamp_ntz" => true
case _: StructType
- if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED
- .get() || CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.get() =>
+ if CometConf.COMET_NATIVE_SCAN_IMPL
+ .get()
+ .equals("native_full") || CometConf.COMET_NATIVE_SCAN_IMPL
+ .get()
+ .equals("native_recordbatch") =>
true
case _ => false
}
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index c142abb5c..198a288df 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -100,7 +100,8 @@ class CometParquetFileFormat extends ParquetFileFormat with
MetricsSupport with
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
- val nativeArrowReaderEnabled =
CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.get(sqlConf)
+ val nativeRecordBatchReaderEnabled =
+
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals("native_recordbatch")
(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
@@ -136,7 +137,7 @@ class CometParquetFileFormat extends ParquetFileFormat with
MetricsSupport with
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
val recordBatchReader =
- if (nativeArrowReaderEnabled) {
+ if (nativeRecordBatchReaderEnabled) {
val batchReader = new NativeBatchReader(
sharedConf,
file,
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a92ffa668..1bce92a71 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2516,7 +2516,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
op match {
// Fully native scan for V1
- case scan: CometScanExec if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.get
=>
+ case scan: CometScanExec if
CometConf.COMET_NATIVE_SCAN_IMPL.get.equals("native_full") =>
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
nativeScanBuilder.setSource(op.simpleStringWithNodeId())
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index f9e2c44c6..3bcca69f4 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2339,7 +2339,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_full",
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
val df = spark.read.parquet(dir.toString())
@@ -2372,7 +2372,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_full",
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
val df = spark.read.parquet(dir.toString())
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 1d1af7b3e..2df3c74ae 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -554,9 +554,7 @@ class CometExecSuite extends CometTestBase {
}
test("Comet native metrics: scan") {
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "false") {
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "native-scan.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = true, 10000)
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 39af52e90..9652880cd 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -80,8 +80,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
- conf.set(CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key, "false")
- conf.set(CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.key, "true")
+ conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, "native_full")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key,
"true")
conf
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index c3513e59e..90a74e9fb 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -93,7 +93,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
var expectedSimplified = FileUtils.readFileToString(simplifiedFile,
StandardCharsets.UTF_8)
val explainFile = new File(dir, "explain.txt")
var expectedExplain = FileUtils.readFileToString(explainFile,
StandardCharsets.UTF_8)
- if (!CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.get()) {
+ if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals("native_full")) {
expectedExplain = expectedExplain.replace("CometNativeScan", "CometScan")
expectedSimplified = expectedSimplified.replace("CometNativeScan",
"CometScan")
}
@@ -265,8 +265,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
- CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "false",
- CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.key -> "true",
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_full",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
@@ -299,8 +298,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
- conf.set(CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key, "false")
- conf.set(CometConf.COMET_NATIVE_RECORDBATCH_READER_ENABLED.key, "true")
+ conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, "native_full")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]