This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 1da18dd81 chore: Remove deprecated SCAN_NATIVE_COMET constant and
related test code (#3671)
1da18dd81 is described below
commit 1da18dd81e2e87971f9144c17bdfea542a74b763
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 11 16:14:47 2026 -0600
chore: Remove deprecated SCAN_NATIVE_COMET constant and related test code
(#3671)
---
.../main/scala/org/apache/comet/CometConf.scala | 6 -
.../org/apache/spark/sql/comet/util/Utils.scala | 2 +-
.../org/apache/spark/sql/comet/CometScanExec.scala | 2 +-
.../apache/comet/CometArrayExpressionSuite.scala | 8 +-
.../scala/org/apache/comet/CometCastSuite.scala | 10 +-
.../org/apache/comet/CometExpressionSuite.scala | 191 +------------
.../org/apache/comet/CometFuzzAggregateSuite.scala | 36 +--
.../org/apache/comet/CometFuzzTestSuite.scala | 74 ++---
.../org/apache/comet/CometMapExpressionSuite.scala | 11 +-
.../comet/exec/CometColumnarShuffleSuite.scala | 61 ----
.../org/apache/comet/exec/CometExecSuite.scala | 35 ---
.../org/apache/comet/exec/CometJoinSuite.scala | 63 -----
.../apache/comet/parquet/ParquetReadSuite.scala | 309 +--------------------
.../apache/comet/rules/CometScanRuleSuite.scala | 39 ---
.../scala/org/apache/spark/sql/CometTestBase.scala | 6 -
.../spark/sql/CometToPrettyStringSuite.scala | 6 +-
.../spark/sql/CometToPrettyStringSuite.scala | 6 +-
17 files changed, 45 insertions(+), 820 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 38a9d0b2c..4d2e37924 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -114,12 +114,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)
- // Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI
best practices
- // and does not support complex types. Use native_iceberg_compat or auto
instead.
- // This will be removed in a future release.
- // See: https://github.com/apache/datafusion-comet/issues/2186
- @deprecated("Use SCAN_AUTO instead. native_comet will be removed in a future
release.", "0.9.0")
- val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
val SCAN_AUTO = "auto"
diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
index 7662b219c..6eaa9cad4 100644
--- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
+++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
@@ -271,7 +271,7 @@ object Utils extends CometTypeShim {
throw new SparkException(
s"Comet execution only takes Arrow Arrays, but got ${c.getClass}.
" +
"This typically happens when a Comet scan falls back to Spark
due to unsupported " +
- "data types (e.g., complex types like structs, arrays, or maps
with native_comet). " +
+ "data types (e.g., complex types like structs, arrays, or maps).
" +
"To resolve this, you can: " +
"(1) enable spark.comet.scan.allowIncompatible=true to use a
compatible native " +
"scan variant, or " +
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 2707f0c04..6151a4379 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -51,7 +51,7 @@ import org.apache.comet.parquet.CometParquetFileFormat
*
* This is a hybrid scan where the native plan will contain a `ScanExec` that
reads batches of
* data from the JVM via JNI. The ultimate source of data may be a JVM
implementation such as
- * Spark readers, or could be the `native_comet` or `native_iceberg_compat`
native scans.
+ * Spark readers, or could be the `native_iceberg_compat` native scan.
*
* Note that scanImpl can only be `native_datafusion` after CometScanRule runs
and before
* CometExecRule runs. It will never be set to `native_datafusion` at
execution time
diff --git
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index 21b4276a6..65b2c8537 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -812,13 +812,7 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
// https://github.com/apache/datafusion-comet/issues/2612
test("array_reverse - fallback for binary array") {
- val fallbackReason =
- if (CometConf.COMET_NATIVE_SCAN_IMPL.key == CometConf.SCAN_NATIVE_COMET
|| sys.env
- .getOrElse("COMET_PARQUET_SCAN_IMPL", "") ==
CometConf.SCAN_NATIVE_COMET) {
- "Unsupported schema"
- } else {
- CometArrayReverse.unsupportedReason
- }
+ val fallbackReason = CometArrayReverse.unsupportedReason
withTable("t1") {
sql("""create table t1 using parquet as
select cast(null as array<binary>) c1, cast(array() as
array<binary>) c2
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 2fb3fc062..3d9acc39e 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -1205,14 +1205,8 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
|USING parquet
""".stripMargin)
sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')")
- if (!usingLegacyNativeCometScan) {
- checkSparkAnswerAndOperator(
- "SELECT CAST(s AS struct<field1:string, field2:string>) AS
new_struct FROM tab1")
- } else {
- // Should just fall back to Spark since non-DataSourceExec scan does
not support nested types.
- checkSparkAnswer(
- "SELECT CAST(s AS struct<field1:string, field2:string>) AS
new_struct FROM tab1")
- }
+ checkSparkAnswerAndOperator(
+ "SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct
FROM tab1")
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 339061f5b..570db1795 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -185,22 +185,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("basic data type support") {
- // this test requires native_comet scan due to unsigned u8/u16 issue
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled =
dictionaryEnabled, 10000)
- withParquetTable(path.toString, "tbl") {
- checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100")
- }
- }
- }
- }
- }
-
test("basic data type support - excluding u8/u16") {
// variant that skips _9 (UINT_8) and _10 (UINT_16) for default scan impl
Seq(true, false).foreach { dictionaryEnabled =>
@@ -217,27 +201,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("uint data type support") {
- // this test requires native_comet scan due to unsigned u8/u16 issue
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "testuint.parquet")
- makeParquetFileAllPrimitiveTypes(
- path,
- dictionaryEnabled = dictionaryEnabled,
- Byte.MinValue,
- Byte.MaxValue)
- withParquetTable(path.toString, "tbl") {
- val qry = "select _9 from tbl order by _11"
- checkSparkAnswerAndOperator(qry)
- }
- }
- }
- }
- }
-
test("uint data type support - excluding u8/u16") {
// variant that tests UINT_32 and UINT_64, skipping _9 (UINT_8) and _10
(UINT_16)
Seq(true, false).foreach { dictionaryEnabled =>
@@ -1491,57 +1454,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("round") {
- // https://github.com/apache/datafusion-comet/issues/1441
- assume(usingLegacyNativeCometScan)
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllPrimitiveTypes(
- path,
- dictionaryEnabled = dictionaryEnabled,
- -128,
- 128,
- randomSize = 100)
- // this test requires native_comet scan due to unsigned u8/u16 issue
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- withParquetTable(path.toString, "tbl") {
- for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15,
15, -16, 16,
- null)) {
- // array tests
- // TODO: enable test for floats (_6, _7, _8, _13)
- for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) {
- checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s})
FROM tbl")
- }
- // scalar tests
- // Exclude the constant folding optimizer in order to actually
execute the native round
- // operations for scalar (literal) values.
- // TODO: comment in the tests for float once supported
- withSQLConf(
- "spark.sql.optimizer.excludedRules" ->
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
- for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) {
- checkSparkAnswerAndOperator(
- s"select round(cast(${n} as tinyint), ${s}) FROM tbl")
- // checkSparkAnswerAndCometOperators(s"select
round(cast(${n} as float), ${s}) FROM tbl")
- checkSparkAnswerAndOperator(
- s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM
tbl")
- checkSparkAnswerAndOperator(
- s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM
tbl")
- }
- // checkSparkAnswer(s"select round(double('infinity'), ${s})
FROM tbl")
- // checkSparkAnswer(s"select round(double('-infinity'), ${s})
FROM tbl")
- // checkSparkAnswer(s"select round(double('NaN'), ${s}) FROM
tbl")
- // checkSparkAnswer(
- // s"select
round(double('0.000000000000000000000000000000000001'), ${s}) FROM tbl")
- }
- }
- }
- }
- }
- }
- }
-
test("md5") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
@@ -1556,25 +1468,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("hex") {
- // https://github.com/apache/datafusion-comet/issues/1441
- assume(usingLegacyNativeCometScan)
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "hex.parquet")
- // this test requires native_comet scan due to unsigned u8/u16 issue
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled =
dictionaryEnabled, 10000)
- withParquetTable(path.toString, "tbl") {
- checkSparkAnswerAndOperator(
- "SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6),
hex(_7), hex(_8), hex(_9), hex(_10), hex(_11), hex(_12), hex(_13), hex(_14),
hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl")
- }
- }
- }
- }
- }
-
test("unhex") {
val table = "unhex_table"
withTable(table) {
@@ -2442,13 +2335,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
df.write.parquet(dir.toString())
}
val df = spark.read.parquet(dir.toString()).select("nested1.id")
- // Comet's original scan does not support structs.
- // The plan will have a Comet Scan only if scan impl is native_full or
native_recordbatch
- if (!scanImpl.equals(CometConf.SCAN_NATIVE_COMET)) {
- checkSparkAnswerAndOperator(df)
- } else {
- checkSparkAnswer(df)
- }
+ checkSparkAnswerAndOperator(df)
}
}
@@ -2474,19 +2361,10 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
val df = spark.read.parquet(dir.toString())
- // Comet's original scan does not support structs.
- // The plan will have a Comet Scan only if scan impl is native_full or
native_recordbatch
- if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
- checkSparkAnswerAndOperator(df.select("nested1.id"))
- checkSparkAnswerAndOperator(df.select("nested1.nested2"))
- checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
- checkSparkAnswerAndOperator(df.select("nested1.id",
"nested1.nested2.id"))
- } else {
- checkSparkAnswer(df.select("nested1.id"))
- checkSparkAnswer(df.select("nested1.nested2"))
- checkSparkAnswer(df.select("nested1.nested2.id"))
- checkSparkAnswer(df.select("nested1.id", "nested1.nested2.id"))
- }
+ checkSparkAnswerAndOperator(df.select("nested1.id"))
+ checkSparkAnswerAndOperator(df.select("nested1.nested2"))
+ checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
+ checkSparkAnswerAndOperator(df.select("nested1.id",
"nested1.nested2.id"))
}
}
@@ -2512,13 +2390,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
val df = spark.read.parquet(dir.toString()).select("nested1.id")
- // Comet's original scan does not support structs.
- // The plan will have a Comet Scan only if scan impl is native_full or
native_recordbatch
- if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
- checkSparkAnswerAndOperator(df)
- } else {
- checkSparkAnswer(df)
- }
+ checkSparkAnswerAndOperator(df)
}
}
@@ -2595,7 +2467,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("get_struct_field with DataFusion ParquetExec - read entire struct") {
- assume(!usingLegacyNativeCometScan(conf))
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
@@ -2632,7 +2503,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("read array[int] from parquet") {
- assume(!usingLegacyNativeCometScan(conf))
withTempPath { dir =>
// create input file with Comet disabled
@@ -2773,55 +2643,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("test integral divide") {
- // this test requires native_comet scan due to unsigned u8/u16 issue
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path1 = new Path(dir.toURI.toString, "test1.parquet")
- val path2 = new Path(dir.toURI.toString, "test2.parquet")
- makeParquetFileAllPrimitiveTypes(
- path1,
- dictionaryEnabled = dictionaryEnabled,
- 0,
- 0,
- randomSize = 10000)
- makeParquetFileAllPrimitiveTypes(
- path2,
- dictionaryEnabled = dictionaryEnabled,
- 0,
- 0,
- randomSize = 10000)
- withParquetTable(path1.toString, "tbl1") {
- withParquetTable(path2.toString, "tbl2") {
- checkSparkAnswerAndOperator("""
- |select
- | t1._2 div t2._2, div(t1._2, t2._2),
- | t1._3 div t2._3, div(t1._3, t2._3),
- | t1._4 div t2._4, div(t1._4, t2._4),
- | t1._5 div t2._5, div(t1._5, t2._5),
- | t1._9 div t2._9, div(t1._9, t2._9),
- | t1._10 div t2._10, div(t1._10, t2._10),
- | t1._11 div t2._11, div(t1._11, t2._11)
- | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
- | order by t1._id""".stripMargin)
-
- checkSparkAnswerAndOperator("""
- |select
- | t1._12 div t2._12, div(t1._12, t2._12),
- | t1._15 div t2._15, div(t1._15, t2._15),
- | t1._16 div t2._16, div(t1._16, t2._16),
- | t1._17 div t2._17, div(t1._17, t2._17)
- | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
- | order by t1._id""".stripMargin)
- }
- }
- }
- }
- }
- }
-
test("ANSI support for add") {
val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1))
withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
diff --git
a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
index 191ebd908..3674887f8 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
@@ -29,9 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filterNot(f =>
isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT count(distinct $col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
checkSparkAnswerAndOperator(sql)
}
@@ -45,9 +43,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filter(f =>
isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT count(distinct $col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -57,9 +53,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filterNot(f =>
isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1,
c2, c3"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
checkSparkAnswerAndOperator(sql)
}
@@ -73,9 +67,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filter(f =>
isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1,
c2, c3"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -87,9 +79,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.columns) {
val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1
group by c1, c2, c3"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -99,9 +89,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.columns) {
val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -112,9 +100,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.columns.drop(1)) {
val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol
ORDER BY $groupCol"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -126,9 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " +
s"GROUP BY $groupCol ORDER BY $groupCol"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
test("min/max aggregate") {
@@ -138,9 +122,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
// cannot run fully native due to HashAggregate
val sql = s"SELECT min($col), max($col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index fe6032414..c1d7d8e72 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -37,33 +37,21 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1"
- if (!usingLegacyNativeCometScan) {
- checkSparkAnswerAndOperator(sql)
- } else {
- checkSparkAnswer(sql)
- }
+ checkSparkAnswerAndOperator(sql)
}
test("select * with deeply nested complex types") {
val df = spark.read.parquet(complexTypesFilename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1"
- if (CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET)
{
- checkSparkAnswerAndOperator(sql)
- } else {
- checkSparkAnswer(sql)
- }
+ checkSparkAnswerAndOperator(sql)
}
test("select * with limit") {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1 LIMIT 500"
- if (!usingLegacyNativeCometScan) {
- checkSparkAnswerAndOperator(sql)
- } else {
- checkSparkAnswer(sql)
- }
+ checkSparkAnswerAndOperator(sql)
}
test("select column with default value") {
@@ -112,11 +100,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
s"alter table t2 add column col2 $defaultValueType default
$defaultValueString")
// Verify that our default value matches Spark's answer
val sql = "select col2 from t2"
- if (!usingLegacyNativeCometScan) {
- checkSparkAnswerAndOperator(sql)
- } else {
- checkSparkAnswer(sql)
- }
+ checkSparkAnswerAndOperator(sql)
// Verify that our default value matches what we originally selected
out of t1.
if (defaultValueType == "BINARY") {
assert(
@@ -139,9 +123,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val sql = s"SELECT $col FROM t1 ORDER BY $col"
// cannot run fully natively due to range partitioning and sort
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
}
@@ -152,9 +134,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols"
// cannot run fully natively due to range partitioning and sort
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(1 == collectNativeScans(cometPlan).length)
- }
+ assert(1 == collectNativeScans(cometPlan).length)
}
test("order by random columns") {
@@ -186,18 +166,12 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
// check for Comet shuffle
val plan =
df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val cometShuffleExchanges = collectCometShuffleExchanges(plan)
- val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get()
match {
- case CometConf.SCAN_NATIVE_COMET =>
- // native_comet does not support reading complex types
+ val expectedNumCometShuffles = CometConf.COMET_SHUFFLE_MODE.get() match {
+ case "jvm" =>
+ 1
+ case "native" =>
+ // native shuffle does not support complex types as partitioning keys
0
- case _ =>
- CometConf.COMET_SHUFFLE_MODE.get() match {
- case "jvm" =>
- 1
- case "native" =>
- // native shuffle does not support complex types as partitioning
keys
- 0
- }
}
assert(cometShuffleExchanges.length == expectedNumCometShuffles)
}
@@ -207,22 +181,14 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val df = spark.read.parquet(filename)
val df2 = df.repartition(8, df.col("c0")).sort("c1")
df2.collect()
- if (!usingLegacyNativeCometScan) {
- val cometShuffles =
collectCometShuffleExchanges(df2.queryExecution.executedPlan)
- val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get()
match {
- case CometConf.SCAN_NATIVE_COMET =>
- // native_comet does not support reading complex types
- 0
- case _ =>
- CometConf.COMET_SHUFFLE_MODE.get() match {
- case "jvm" =>
- 1
- case "native" =>
- 2
- }
- }
- assert(cometShuffles.length == expectedNumCometShuffles)
+ val cometShuffles =
collectCometShuffleExchanges(df2.queryExecution.executedPlan)
+ val expectedNumCometShuffles = CometConf.COMET_SHUFFLE_MODE.get() match {
+ case "jvm" =>
+ 1
+ case "native" =>
+ 2
}
+ assert(cometShuffles.length == expectedNumCometShuffles)
}
test("join") {
@@ -233,9 +199,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
// cannot run fully native due to HashAggregate
val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col"
val (_, cometPlan) = checkSparkAnswer(sql)
- if (!usingLegacyNativeCometScan) {
- assert(2 == collectNativeScans(cometPlan).length)
- }
+ assert(2 == collectNativeScans(cometPlan).length)
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
index d35eeb0b6..03db26e56 100644
--- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
@@ -33,7 +33,6 @@ import org.apache.comet.testing.{DataGenOptions,
ParquetGenerator, SchemaGenOpti
class CometMapExpressionSuite extends CometTestBase {
test("read map[int, int] from parquet") {
- assume(!usingLegacyNativeCometScan(conf))
withTempPath { dir =>
// create input file with Comet disabled
@@ -65,7 +64,6 @@ class CometMapExpressionSuite extends CometTestBase {
// repro for https://github.com/apache/datafusion-comet/issues/1754
test("read map[struct, struct] from parquet") {
- assume(!usingLegacyNativeCometScan(conf))
withTempPath { dir =>
// create input file with Comet disabled
@@ -224,14 +222,7 @@ class CometMapExpressionSuite extends CometTestBase {
}
test("map_from_entries - fallback for binary type") {
- def fallbackReason(reason: String) = {
- if (CometConf.COMET_NATIVE_SCAN_IMPL.key == CometConf.SCAN_NATIVE_COMET
|| sys.env
- .getOrElse("COMET_PARQUET_SCAN_IMPL", "") ==
CometConf.SCAN_NATIVE_COMET) {
- "Unsupported schema"
- } else {
- reason
- }
- }
+ def fallbackReason(reason: String) = reason
val table = "t2"
withTable(table) {
sql(
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index ed204ef77..f632e10df 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -608,67 +608,6 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
}
}
- test("fix: native Unsafe row accessors return incorrect results") {
- // TODO byte/short issue
- assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() ==
CometConf.SCAN_NATIVE_COMET)
- Seq(10, 201).foreach { numPartitions =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllPrimitiveTypes(path, false, 10000, 10010)
- // TODO: revisit this when we have resolution of
https://github.com/apache/arrow-rs/issues/7040
- // and https://github.com/apache/arrow-rs/issues/7097
- val fieldsToTest =
- if (!usingLegacyNativeCometScan(conf)) {
- Seq(
- $"_1",
- $"_4",
- $"_5",
- $"_6",
- $"_7",
- $"_8",
- $"_11",
- $"_12",
- $"_13",
- $"_14",
- $"_15",
- $"_16",
- $"_17",
- $"_18",
- $"_19",
- $"_20")
- } else {
- Seq(
- $"_1",
- $"_2",
- $"_3",
- $"_4",
- $"_5",
- $"_6",
- $"_7",
- $"_8",
- $"_9",
- $"_10",
- $"_11",
- $"_12",
- $"_13",
- $"_14",
- $"_15",
- $"_16",
- $"_17",
- $"_18",
- $"_19",
- $"_20")
- }
- fieldsToTest.foreach { col =>
- readParquetFile(path.toString) { df =>
- val shuffled = df.select(col).repartition(numPartitions, col)
- checkShuffleAnswer(shuffled, 1)
- }
- }
- }
- }
- }
-
test("fix: StreamReader should always set useDecimal128 as true") {
Seq(10, 201).foreach { numPartitions =>
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index bcbbdb7f9..91a02d69f 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -382,41 +382,6 @@ class CometExecSuite extends CometTestBase {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("ReusedExchangeExec should work on CometBroadcastExchangeExec with V2
scan") {
- withSQLConf(
- CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
- SQLConf.USE_V1_SOURCE_LIST.key -> "") {
- withTempPath { path =>
- spark
- .range(5)
- .withColumn("p", $"id" % 2)
- .write
- .mode("overwrite")
- .partitionBy("p")
- .parquet(path.toString)
- withTempView("t") {
- spark.read.parquet(path.toString).createOrReplaceTempView("t")
- val df = sql("""
- |SELECT t1.id, t2.id, t3.id
- |FROM t AS t1
- |JOIN t AS t2 ON t2.id = t1.id
- |JOIN t AS t3 ON t3.id = t2.id
- |WHERE t1.p = 1 AND t2.p = 1 AND t3.p = 1
- |""".stripMargin)
- val reusedPlan =
ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan)
- val reusedExchanges = collect(reusedPlan) { case r:
ReusedExchangeExec =>
- r
- }
- assert(reusedExchanges.size == 1)
-
assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec])
- }
- }
- }
- }
-
test("CometShuffleExchangeExec logical link should be correct") {
withTempView("v") {
spark.sparkContext
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 6111b9c0d..d5a8387be 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec,
CometBroadcastHashJoinExec}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.Decimal
import org.apache.comet.CometConf
@@ -197,68 +196,6 @@ class CometJoinSuite extends CometTestBase {
}
}
- test("HashJoin struct key") {
- // https://github.com/apache/datafusion-comet/issues/1441
- assume(usingLegacyNativeCometScan)
- withSQLConf(
- "spark.sql.join.forceApplyShuffledHashJoin" -> "true",
- SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
- SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
-
- def manyTypes(idx: Int, v: Int) =
- (
- idx,
- v,
- v.toLong,
- v.toFloat,
- v.toDouble,
- v.toString,
- v % 2 == 0,
- v.toString.getBytes,
- Decimal(v))
-
- withParquetTable((0 until 10).map(i => manyTypes(i, i % 5)), "tbl_a") {
- withParquetTable((0 until 10).map(i => manyTypes(i, i % 10)), "tbl_b")
{
- // Full join: struct key
- val df1 =
- sql(
- "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b
" +
- "ON named_struct('1', tbl_a._2) = named_struct('1', tbl_b._1)")
- checkSparkAnswerAndOperator(df1)
-
- // Full join: struct key with nulls
- val df2 =
- sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN
tbl_b " +
- "ON IF(tbl_a._1 > 5, named_struct('2', tbl_a._2), NULL) =
IF(tbl_b._2 > 5, named_struct('2', tbl_b._1), NULL)")
- checkSparkAnswerAndOperator(df2)
-
- // Full join: struct key with nulls in the struct
- val df3 =
- sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN
tbl_b " +
- "ON named_struct('2', IF(tbl_a._1 > 5, tbl_a._2, NULL)) =
named_struct('2', IF(tbl_b._2 > 5, tbl_b._1, NULL))")
- checkSparkAnswerAndOperator(df3)
-
- // Full join: nested structs
- val df4 =
- sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN
tbl_b " +
- "ON named_struct('1', named_struct('2', tbl_a._2)) =
named_struct('1', named_struct('2', tbl_b._1))")
- checkSparkAnswerAndOperator(df4)
-
- val columnCount = manyTypes(0, 0).productArity
- def key(tbl: String) =
- (1 to columnCount).map(i => s"${tbl}._$i").mkString("struct(", ",
", ")")
- // Using several different types in the struct key
- val df5 =
- sql(
- "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b
" +
- s"ON ${key("tbl_a")} = ${key("tbl_b")}")
- checkSparkAnswerAndOperator(df5)
- }
- }
- }
- }
-
test("HashJoin with join filter") {
withSQLConf(
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 4a049afbf..b9caa9430 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -23,7 +23,6 @@ import java.io.File
import java.math.{BigDecimal, BigInteger}
import java.time.{ZoneId, ZoneOffset}
-import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.Breaks.breakable
@@ -37,7 +36,7 @@ import org.apache.parquet.schema.MessageTypeParser
import org.apache.spark.SparkException
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec,
CometScanExec}
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
@@ -46,8 +45,6 @@ import org.apache.spark.sql.types._
import com.google.common.primitives.UnsignedLong
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
-import org.apache.comet.rules.CometScanTypeChecker
abstract class ParquetReadSuite extends CometTestBase {
import testImplicits._
@@ -82,88 +79,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("unsupported Spark types") {
- // TODO this test is not correctly implemented for scan implementations
other than SCAN_NATIVE_COMET
- // https://github.com/apache/datafusion-comet/issues/2188
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- // for native iceberg compat, CometScanExec supports some types that
native_comet does not.
- // note that native_datafusion does not use CometScanExec so we need not
include that in
- // the check
- val isDataFusionScan = !usingLegacyNativeCometScan(conf)
- Seq(
- NullType -> false,
- BooleanType -> true,
- ByteType -> true,
- ShortType -> true,
- IntegerType -> true,
- LongType -> true,
- FloatType -> true,
- DoubleType -> true,
- BinaryType -> true,
- StringType -> true,
- // Timestamp here arbitrary for picking a concrete data type to from
ArrayType
- // Any other type works
- ArrayType(TimestampType) -> isDataFusionScan,
- StructType(
- Seq(
- StructField("f1", DecimalType.SYSTEM_DEFAULT),
- StructField("f2", StringType))) -> isDataFusionScan,
- MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan,
- StructType(
- Seq(StructField("f1", ByteType), StructField("f2", StringType))) ->
isDataFusionScan,
- MapType(keyType = IntegerType, valueType = BinaryType) ->
isDataFusionScan)
- .foreach { case (dt, expected) =>
- val fallbackReasons = new ListBuffer[String]()
- // TODO CometScanTypeChecker should only be used for
ParquetReadSuiteV1Suite
- assert(
- CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
- .isTypeSupported(dt, "", fallbackReasons) == expected)
- // usingDataFusionParquetExec does not support CometBatchScanExec yet
- // TODO CometBatchScanExec should only be used for
ParquetReadSuiteV2Suite
- if (!isDataFusionScan) {
- assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons)
== expected)
- }
- }
- }
- }
-
- // ignored: native_comet scan is no longer supported
- ignore("unsupported Spark schema") {
- // TODO this test is not correctly implemented for scan implementations
other than SCAN_NATIVE_COMET
- // https://github.com/apache/datafusion-comet/issues/2188
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- val schemaDDLs =
- Seq(
- "f1 int, f2 boolean",
- "f1 int, f2 array<int>",
- "f1 map<long, string>, f2 array<double>")
- .map(s => StructType.fromDDL(s))
-
- // Arrays support for iceberg compat native and for Parquet V1
- val cometScanExecSupported =
- if (!usingLegacyNativeCometScan(conf) &&
this.isInstanceOf[ParquetReadV1Suite])
- Seq(true, true, true)
- else Seq(true, false, false)
-
- val cometBatchScanExecSupported = Seq(true, false, false)
- val fallbackReasons = new ListBuffer[String]()
-
- // TODO CometScanTypeChecker should only be used for
ParquetReadSuiteV1Suite
- schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected)
=>
- assert(
- CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
- .isSchemaSupported(StructType(schema), fallbackReasons) ==
expected)
- }
-
- // TODO CometBatchScanExec should only be used for
ParquetReadSuiteV2Suite
- schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema,
expected) =>
- assert(
- CometBatchScanExec.isSchemaSupported(StructType(schema),
fallbackReasons) == expected)
- }
- }
- }
-
test("simple count") {
withParquetTable((0 until 10).map(i => (i, i.toString)), "tbl") {
assert(sql("SELECT * FROM tbl WHERE _1 % 2 == 0").count() == 5)
@@ -367,118 +282,6 @@ abstract class ParquetReadSuite extends CometTestBase {
checkParquetFile(data)
}
- // ignored: native_comet scan is no longer supported
- ignore("test multiple pages with different sizes and nulls") {
- def makeRawParquetFile(
- path: Path,
- dictionaryEnabled: Boolean,
- n: Int,
- pageSize: Int): Seq[Option[Int]] = {
- val schemaStr = {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_8);
- | optional int32 _10(UINT_16);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional FIXED_LEN_BYTE_ARRAY(3) _14;
- |}
- """.stripMargin
- }
-
- val schema = MessageTypeParser.parseMessageType(schemaStr)
- val writer = createParquetWriter(
- schema,
- path,
- dictionaryEnabled = dictionaryEnabled,
- pageSize = pageSize,
- dictionaryPageSize = pageSize)
-
- val rand = new scala.util.Random(42)
- val expected = (0 until n).map { i =>
- if (rand.nextBoolean()) {
- None
- } else {
- Some(i)
- }
- }
- expected.foreach { opt =>
- val record = new SimpleGroup(schema)
- opt match {
- case Some(i) =>
- record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
- record.add(3, i)
- record.add(4, i.toLong)
- record.add(5, i.toFloat)
- record.add(6, i.toDouble)
- record.add(7, i.toString * 48)
- record.add(8, (-i).toByte)
- record.add(9, (-i).toShort)
- record.add(10, -i)
- record.add(11, (-i).toLong)
- record.add(12, i.toString)
- record.add(13, (i % 10).toString * 3)
- case _ =>
- }
- writer.write(record)
- }
-
- writer.close()
- expected
- }
-
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- Seq(64, 128, 256, 512, 1024, 4096, 5000).foreach { pageSize =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "part-r-0.parquet")
- val expected = makeRawParquetFile(path, dictionaryEnabled = false,
10000, pageSize)
- readParquetFile(path.toString) { df =>
- checkAnswer(
- df,
- expected.map {
- case None =>
- Row(null, null, null, null, null, null, null, null, null,
null, null, null,
- null, null)
- case Some(i) =>
- val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is
48 in ascii
- Row(
- i % 2 == 0,
- i.toByte,
- i.toShort,
- i,
- i.toLong,
- i.toFloat,
- i.toDouble,
- i.toString * 48,
- (-i).toByte,
- (-i).toShort,
- java.lang.Integer.toUnsignedLong(-i),
- new
BigDecimal(UnsignedLong.fromLongBits((-i).toLong).bigIntegerValue()),
- i.toString,
- flba_field)
- })
- }
- readParquetFile(path.toString) { df =>
- assert(
- df.filter("_8 IS NOT NULL AND _4 % 256 == 255").count() ==
- expected.flatten.count(_ % 256 == 255))
- }
- }
- }
- }
- }
-
test("vector reloading with all non-null values") {
def makeRawParquetFile(
path: Path,
@@ -1274,61 +1077,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("scan metrics") {
-
- val cometScanMetricNames = Seq(
- "ParquetRowGroups",
- "ParquetNativeDecodeTime",
- "ParquetNativeLoadTime",
- "ParquetLoadRowGroupTime",
- "ParquetInputFileReadTime",
- "ParquetInputFileReadSize",
- "ParquetInputFileReadThroughput")
-
- val cometNativeScanMetricNames = Seq(
- "time_elapsed_scanning_total",
- "bytes_scanned",
- "output_rows",
- "time_elapsed_opening",
- "time_elapsed_processing",
- "time_elapsed_scanning_until_data")
-
- withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") {
- // TODO need to implement metrics for SCAN_NATIVE_ICEBERG_COMPAT
- // https://github.com/apache/datafusion-comet/issues/1882
- withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_COMET) {
- val df = sql("SELECT * FROM tbl WHERE _1 > 0")
- val scans = df.queryExecution.executedPlan collect {
- case s: CometScanExec => s
- case s: CometBatchScanExec => s
- case s: CometNativeScanExec => s
- }
- assert(scans.size == 1, s"Expect one scan node but found
${scans.size}")
- val metrics = scans.head.metrics
-
- val metricNames = scans.head match {
- case _: CometNativeScanExec => cometNativeScanMetricNames
- case s: CometScanExec if s.scanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
- cometNativeScanMetricNames
- case _ => cometScanMetricNames
- }
-
- metricNames.foreach { metricName =>
- assert(metrics.contains(metricName), s"metric $metricName was not
found")
- }
-
- df.collect()
-
- metricNames.foreach { metricName =>
- assert(
- metrics(metricName).value > 0,
- s"Expect metric value for $metricName to be positive")
- }
- }
- }
- }
-
test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
// In this test, data is encoded using Parquet page v2 format, but with
PLAIN encoding
checkAnswer(
@@ -1443,25 +1191,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
- test("row group skipping doesn't overflow when reading into larger type") {
- // Spark 4.0 no longer fails for widening types SPARK-40876
- //
https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
- assume(!isSpark40Plus && usingLegacyNativeCometScan(conf))
- withTempPath { path =>
- Seq(0).toDF("a").write.parquet(path.toString)
- // Reading integer 'a' as a long isn't supported. Check that an
exception is raised instead
- // of incorrectly skipping the single row group and producing incorrect
results.
- val exception = intercept[SparkException] {
- spark.read
- .schema("a LONG")
- .parquet(path.toString)
- .where(s"a < ${Long.MaxValue}")
- .collect()
- }
- assert(exception.getMessage.contains("Column: [a], Expected: bigint,
Found: INT32"))
- }
- }
-
test("test merge scan range") {
def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = {
val dictionaryPageSize = 1024
@@ -1845,39 +1574,3 @@ class ParquetReadV1Suite extends ParquetReadSuite with
AdaptiveSparkPlanHelper {
}
}
-
-// ignored: native_comet scan is no longer supported
-class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper
{
- override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
- pos: Position): Unit = {
- super.ignore(testName, testTags: _*)(
- withSQLConf(
- SQLConf.USE_V1_SOURCE_LIST.key -> "",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
- testFun
- })(pos)
- }
-
- override def checkParquetScan[T <: Product: ClassTag: TypeTag](
- data: Seq[T],
- f: Row => Boolean = _ => true): Unit = {
- withParquetDataFrame(data) { r =>
- val scans = collect(r.filter(f).queryExecution.executedPlan) { case p:
CometBatchScanExec =>
- p.scan
- }
- assert(scans.isEmpty)
- }
- }
-
- // ignored: native_comet scan is no longer supported
- ignore("Test V2 parquet scan uses respective scanner") {
- Seq(("false", "BatchScan"), ("true", "CometBatchScan")).foreach {
- case (cometEnabled, expectedScanner) =>
- testScanner(
- cometEnabled,
- CometConf.SCAN_NATIVE_COMET,
- scanner = expectedScanner,
- v1 = None)
- }
- }
-}
diff --git
a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
index 18dec6817..6214513f4 100644
--- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
@@ -25,8 +25,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.QueryStageExec
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.comet.CometConf
@@ -100,43 +98,6 @@ class CometScanRuleSuite extends CometTestBase {
}
}
- // ignored: native_comet scan is no longer supported
- ignore("CometScanRule should replace V2 BatchScanExec, but only when Comet
is enabled") {
- withTempPath { path =>
- createTestDataFrame.write.parquet(path.toString)
- withTempView("test_data") {
- withSQLConf(
- SQLConf.USE_V1_SOURCE_LIST.key -> "",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET)
{
-
spark.read.parquet(path.toString).createOrReplaceTempView("test_data")
-
- val sparkPlan =
- createSparkPlan(
- spark,
- "SELECT id, id * 2 as doubled FROM test_data WHERE id % 2 == 0")
-
- // Count original Spark operators
- assert(countOperators(sparkPlan, classOf[BatchScanExec]) == 1)
-
- for (cometEnabled <- Seq(true, false)) {
- withSQLConf(CometConf.COMET_ENABLED.key -> cometEnabled.toString) {
-
- val transformedPlan = applyCometScanRule(sparkPlan)
-
- if (cometEnabled) {
- assert(countOperators(transformedPlan, classOf[BatchScanExec])
== 0)
- assert(countOperators(transformedPlan,
classOf[CometBatchScanExec]) == 1)
- } else {
- assert(countOperators(transformedPlan, classOf[BatchScanExec])
== 1)
- assert(countOperators(transformedPlan,
classOf[CometBatchScanExec]) == 0)
- }
- }
- }
- }
- }
- }
- }
-
test("CometScanRule should fallback to Spark for ShortType when safety check
enabled") {
withTempPath { path =>
// Create test data with ShortType which may be from unsigned UINT_8
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index f831d53bf..33c1d444b 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -1267,13 +1267,7 @@ abstract class CometTestBase
writer.close()
}
- def usingLegacyNativeCometScan: Boolean =
usingLegacyNativeCometScan(SQLConf.get)
-
- def usingLegacyNativeCometScan(conf: SQLConf): Boolean =
- CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET
-
def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = {
- !usingLegacyNativeCometScan(conf) &&
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
}
diff --git
a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
index 70119f44a..5dd956116 100644
---
a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
+++
b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
ToPrettyString}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.types.DataTypes
-import org.apache.comet.{CometConf, CometFuzzTestBase}
+import org.apache.comet.CometFuzzTestBase
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.Compatible
@@ -48,9 +48,7 @@ class CometToPrettyStringSuite extends CometFuzzTestBase {
Some(spark.sessionState.conf.sessionLocalTimeZone),
CometEvalMode.TRY)
supportLevel match {
- case _: Compatible
- if CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_COMET =>
- checkSparkAnswerAndOperator(result)
+ case _: Compatible => checkSparkAnswerAndOperator(result)
case _ => checkSparkAnswer(result)
}
}
diff --git
a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
index b0f40edf7..e7f1757bf 100644
---
a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
+++
b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle
import org.apache.spark.sql.types.DataTypes
-import org.apache.comet.{CometConf, CometFuzzTestBase}
+import org.apache.comet.CometFuzzTestBase
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.Compatible
@@ -59,9 +59,7 @@ class CometToPrettyStringSuite extends CometFuzzTestBase {
Some(spark.sessionState.conf.sessionLocalTimeZone),
CometEvalMode.TRY)
supportLevel match {
- case _: Compatible
- if CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_COMET =>
- checkSparkAnswerAndOperator(result)
+ case _: Compatible => checkSparkAnswerAndOperator(result)
case _ => checkSparkAnswer(result)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]