This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5c1f1311c chore: Add envvars to override writer configs and cometConf
minor clean up (#3540)
5c1f1311c is described below
commit 5c1f1311c78ae19e1d6ae52ea63621d257e70484
Author: Oleks V <[email protected]>
AuthorDate: Wed Feb 18 07:36:59 2026 -0800
chore: Add envvars to override writer configs and cometConf minor clean up
(#3540)
---
.../main/scala/org/apache/comet/CometConf.scala | 56 +++++++++++++++-------
dev/diffs/3.4.3.diff | 2 +-
dev/diffs/3.5.8.diff | 2 +-
dev/diffs/4.0.1.diff | 2 +-
docs/source/user-guide/latest/compatibility.md | 2 +-
.../serde/operator/CometDataWritingCommand.scala | 2 +-
.../scala/org/apache/comet/serde/strings.scala | 8 ++--
.../expressions/string/regexp_replace_enabled.sql | 2 +-
.../sql-tests/expressions/string/rlike_enabled.sql | 2 +-
.../org/apache/comet/CometExpressionSuite.scala | 8 ++--
.../org/apache/comet/CometFuzzIcebergSuite.scala | 2 +-
.../org/apache/comet/CometFuzzTestSuite.scala | 2 +-
.../comet/parquet/CometParquetWriterSuite.scala | 55 ++-------------------
.../sql/benchmark/CometTPCDSMicroBenchmark.scala | 2 +-
14 files changed, 59 insertions(+), 88 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 2439f3b58..480eafdcb 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -78,11 +78,11 @@ object CometConf extends ShimCometConf {
val COMET_PREFIX = "spark.comet";
- val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";
+ val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"
- val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
+ val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"
- val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";
+ val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.category(CATEGORY_EXEC)
@@ -112,7 +112,7 @@ object CometConf extends ShimCometConf {
"feature is highly experimental and only partially implemented. It
should not " +
"be used in production.")
.booleanConf
- .createWithDefault(false)
+ .createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)
// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI
best practices
// and does not support complex types. Use native_iceberg_compat or auto
instead.
@@ -488,6 +488,13 @@ object CometConf extends ShimCometConf {
"Ensure that Comet shuffle memory overhead factor is a double greater
than 0")
.createWithDefault(1.0)
+ val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
+ .category(CATEGORY_TUNING)
+ .doc("The columnar batch size, i.e., the maximum number of rows that a
batch can contain.")
+ .intConf
+ .checkValue(v => v > 0, "Batch size must be positive")
+ .createWithDefault(8192)
+
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.batch.size")
.category(CATEGORY_SHUFFLE)
@@ -495,6 +502,9 @@ object CometConf extends ShimCometConf {
"this should not be larger than batch size (i.e.,
`spark.comet.batchSize`). Otherwise " +
"it will produce larger batches than expected in the native operator
after shuffle.")
.intConf
+ .checkValue(
+ v => v <= COMET_BATCH_SIZE.get(),
+ "Should not be larger than batch size `spark.comet.batchSize`")
.createWithDefault(8192)
val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
@@ -550,6 +560,7 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ // Used on native side. Check spark_config.rs how the config is used
val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.debug.memory")
.category(CATEGORY_TESTING)
@@ -608,12 +619,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
- val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
- .category(CATEGORY_TUNING)
- .doc("The columnar batch size, i.e., the maximum number of rows that a
batch can contain.")
- .intConf
- .createWithDefault(8192)
-
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
.category(CATEGORY_PARQUET)
@@ -793,14 +798,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
- val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
- conf("spark.comet.regexp.allowIncompatible")
- .category(CATEGORY_EXEC)
- .doc("Comet is not currently fully compatible with Spark for all regular
expressions. " +
- s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
- .booleanConf
- .createWithDefault(false)
-
val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
conf("spark.comet.metrics.updateInterval")
.category(CATEGORY_EXEC)
@@ -819,6 +816,7 @@ object CometConf extends ShimCometConf {
.stringConf
.createOptional
+ // Used on native side. Check spark_config.rs how the config is used
val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
conf("spark.comet.maxTempDirectorySize")
.category(CATEGORY_EXEC)
@@ -843,6 +841,9 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)
+ val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean]
=
+ createOperatorIncompatConfig("DataWritingCommandExec")
+
/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
@@ -858,6 +859,25 @@ object CometConf extends ShimCometConf {
.createWithDefault(defaultValue)
}
+ /**
+ * Converts a config key to a valid environment variable name. Example:
+ * "spark.comet.operator.DataWritingCommandExec.allowIncompatible" ->
+ * "SPARK_COMET_OPERATOR_DATAWRITINGCOMMANDEXEC_ALLOWINCOMPATIBLE"
+ */
+ private def configKeyToEnvVar(configKey: String): String =
+ configKey.toUpperCase(Locale.ROOT).replace('.', '_')
+
+ private def createOperatorIncompatConfig(name: String): ConfigEntry[Boolean]
= {
+ val configKey = getOperatorAllowIncompatConfigKey(name)
+ val envVar = configKeyToEnvVar(configKey)
+ conf(configKey)
+ .category(CATEGORY_EXEC)
+ .doc(s"Whether to allow incompatibility for operator: $name. " +
+ s"False by default. Can be overridden with $envVar env variable")
+ .booleanConf
+ .createWithEnvVarOrDefault(envVar, false)
+ }
+
def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
}
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 020588843..5713cacea 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1009,7 +1009,7 @@ index 18123a4d6ec..fbe4c766eee 100644
- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
- Row("num-num", "400-400", "100") :: Nil)
-+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index d2d72e9d6..3aaecdecb 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -985,7 +985,7 @@ index fa1a64460fc..1d2e215d6a3 100644
- ("100-200", "(\\d+)-(\\d+)", "300"),
- ("100-200", "(\\d+)-(\\d+)", "400"),
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
-+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index d6694e827..a1b250655 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -1165,7 +1165,7 @@ index 0df7f806272..52d33d67328 100644
- ("100-200", "(\\d+)-(\\d+)", "300"),
- ("100-200", "(\\d+)-(\\d+)", "400"),
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
-+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
diff --git a/docs/source/user-guide/latest/compatibility.md
b/docs/source/user-guide/latest/compatibility.md
index c09f6a61e..21695bdf5 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -62,7 +62,7 @@ the [Comet Supported Expressions Guide](expressions.md) for
more information on
Comet uses the Rust regexp crate for evaluating regular expressions, and this
has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are
known to produce different results, but
-this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
+this can be overridden by setting
`spark.comet.expression.regexp.allowIncompatible=true`.
## Window Functions
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
index 31575138f..69b9bd5f8 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
@@ -60,7 +60,7 @@ object CometDataWritingCommand extends
CometOperatorSerde[DataWritingCommandExec
case _: ParquetFileFormat =>
if (!cmd.outputPath.toString.startsWith("file:") &&
!cmd.outputPath.toString
.startsWith("hdfs:")) {
- return Unsupported(Some("Only HDFS/local filesystems output
paths are supported"))
+ return Unsupported(Some("Supported output filesystems: local,
HDFS"))
}
if (cmd.bucketSpec.isDefined) {
diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala
b/spark/src/main/scala/org/apache/comet/serde/strings.scala
index db6070900..ae32b625d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/strings.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala
@@ -223,11 +223,11 @@ object CometRLike extends CometExpressionSerde[RLike] {
expr.right match {
case Literal(pattern, DataTypes.StringType) =>
if (!RegExp.isSupportedPattern(pattern.toString) &&
- !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
+ !CometConf.isExprAllowIncompat("regexp")) {
withInfo(
expr,
s"Regexp pattern $pattern is not compatible with Spark. " +
- s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
+ s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true
" +
"to allow it anyway.")
None
} else {
@@ -298,11 +298,11 @@ object CometStringLPad extends
CometExpressionSerde[StringLPad] {
object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
override def getSupportLevel(expr: RegExpReplace): SupportLevel = {
if (!RegExp.isSupportedPattern(expr.regexp.toString) &&
- !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
+ !CometConf.isExprAllowIncompat("regexp")) {
withInfo(
expr,
s"Regexp pattern ${expr.regexp} is not compatible with Spark. " +
- s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
+ s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
"to allow it anyway.")
return Incompatible()
}
diff --git
a/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql
b/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql
index 7ea13a18a..bf8a54451 100644
---
a/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql
+++
b/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql
@@ -16,7 +16,7 @@
-- under the License.
-- Test regexp_replace() with regexp allowIncompatible enabled (happy path)
--- Config: spark.comet.regexp.allowIncompatible=true
+-- Config: spark.comet.expression.regexp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git
a/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql
b/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql
index 968a2f22f..822fb3ddb 100644
--- a/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql
+++ b/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql
@@ -16,7 +16,7 @@
-- under the License.
-- Test RLIKE with regexp allowIncompatible enabled (happy path)
--- Config: spark.comet.regexp.allowIncompatible=true
+-- Config: spark.comet.expression.regexp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true
statement
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 1ab8d54fd..415d8c3a1 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -958,7 +958,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
// add repetitive data to trigger dictionary encoding
Range(0, 100).map(_ => "John Smith")
withParquetFile(data.zipWithIndex, withDictionary) { file =>
- withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") ->
"true") {
spark.read.parquet(file).createOrReplaceTempView(table)
val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from
$table")
checkSparkAnswerAndOperator(query)
@@ -996,7 +996,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
sql(s"insert into $table values(1,'James Smith')")
- withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true")
{
val query2 = sql(s"select id from $table where name rlike name")
val (_, cometPlan) = checkSparkAnswer(query2)
val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
@@ -1030,7 +1030,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
// "Smith$",
"Smith\\Z",
"Smith\\z")
- withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true")
{
patterns.foreach { pattern =>
val query2 = sql(s"select name, '$pattern', name rlike '$pattern'
from $table")
checkSparkAnswerAndOperator(query2)
@@ -1090,7 +1090,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
"\\V")
val qualifiers = Seq("", "+", "*", "?", "{1,}")
- withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true")
{
// testing every possible combination takes too long, so we pick some
// random combinations
for (_ <- 0 until 100) {
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala
index 0a188f6cc..f37d997c4 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala
@@ -133,7 +133,7 @@ class CometFuzzIcebergSuite extends CometFuzzIcebergBase {
}
test("regexp_replace") {
- withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val df = spark.table(icebergTableName)
// We want to make sure that the schema generator wasn't modified to
accidentally omit
// StringType, since then this test would not run any queries and
silently pass.
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 02d13c841..fe6032414 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -255,7 +255,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
}
test("regexp_replace") {
- withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
// We want to make sure that the schema generator wasn't modified to
accidentally omit
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
index e4c405c00..815f03f21 100644
---
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
+++
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
@@ -48,7 +48,7 @@ class CometParquetWriterSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
-
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true",
+ CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key ->
"true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
writeWithCometNativeWriteExec(inputPath, outputPath)
@@ -70,7 +70,7 @@ class CometParquetWriterSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
-
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true",
+ CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key ->
"true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
"native_datafusion") {
@@ -310,55 +310,6 @@ class CometParquetWriterSuite extends CometTestBase {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("native write falls back when scan produces non-Arrow data") {
- // This test verifies that when a native scan (like native_comet) doesn't
support
- // certain data types (complex types), the native write correctly falls
back to Spark
- // instead of failing at runtime with "Comet execution only takes Arrow
Arrays" error.
- withTempPath { dir =>
- val inputPath = new File(dir, "input.parquet").getAbsolutePath
- val outputPath = new File(dir, "output.parquet").getAbsolutePath
-
- // Create data with complex types and write without Comet
- withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
- val df = Seq((1, Seq(1, 2, 3)), (2, Seq(4, 5)), (3, Seq(6, 7, 8, 9)))
- .toDF("id", "values")
- df.write.parquet(inputPath)
- }
-
- // With native Parquet write enabled but using native_comet scan which
doesn't
- // support complex types, the scan falls back to Spark. The native write
should
- // detect this and also fall back to Spark instead of failing at runtime.
- withSQLConf(
- CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
-
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true",
- // Use native_comet which doesn't support complex types
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_comet") {
-
- val plan =
- captureWritePlan(path =>
spark.read.parquet(inputPath).write.parquet(path), outputPath)
-
- // Verify NO CometNativeWriteExec in the plan (should have fallen back
to Spark)
- val hasNativeWrite = plan.exists {
- case _: CometNativeWriteExec => true
- case d: DataWritingCommandExec =>
- d.child.exists(_.isInstanceOf[CometNativeWriteExec])
- case _ => false
- }
-
- assert(
- !hasNativeWrite,
- "Expected fallback to Spark write (no CometNativeWriteExec), but
found native write " +
- s"in plan:\n${plan.treeString}")
-
- // Verify the data was written correctly
- val result = spark.read.parquet(outputPath).collect()
- assert(result.length == 3, "Expected 3 rows to be written")
- }
- }
- }
-
test("parquet write complex types fuzz test") {
withTempPath { dir =>
val outputPath = new File(dir, "output.parquet").getAbsolutePath
@@ -517,7 +468,7 @@ class CometParquetWriterSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
// enable experimental native writes
-
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true",
+ CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key ->
"true",
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
// explicitly set scan impl to override CI defaults
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto",
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala
index d9c49bc59..5982460a8 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala
@@ -114,7 +114,7 @@ object CometTPCDSMicroBenchmark extends
CometTPCQueryBenchmarkBase {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true",
+ CometConf.getExprAllowIncompatConfigKey("regexp") -> "true",
// enabling COMET_EXPLAIN_NATIVE_ENABLED may add overhead but is
useful for debugging
CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "false") {
cometSpark.sql(queryString).noop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]