This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 1a000abba refactor: rename scan.allowIncompatible to
scan.unsignedSmallIntSafetyCheck (#3238)
1a000abba is described below
commit 1a000abba11ce668662485b63ec55e3a177aa5d5
Author: Andy Grove <[email protected]>
AuthorDate: Sun Jan 25 15:34:09 2026 -0700
refactor: rename scan.allowIncompatible to scan.unsignedSmallIntSafetyCheck
(#3238)
* refactor: rename scan.allowIncompatible to
scan.unsignedSmallIntSafetyCheck
This change renames `spark.comet.scan.allowIncompatible` to
`spark.comet.scan.unsignedSmallIntSafetyCheck` and changes its default
to `true` (enabled by default).
The key change is that ByteType is removed from the safety check entirely,
leaving only ShortType subject to fallback behavior.
## Why ByteType is Safe
ByteType columns are always safe for native execution because:
1. **Parquet type mapping**: Spark's ByteType can only originate from signed
INT8 in Parquet. There is no unsigned 8-bit Parquet type (UINT_8) that
maps
to ByteType.
2. **UINT_8 maps to ShortType**: When Parquet files contain unsigned UINT_8
columns, Spark maps them to ShortType (16-bit), not ByteType. This is
because UINT_8 values (0-255) exceed the signed byte range (-128 to 127).
3. **Truncation preserves signed values**: When storing signed INT8 in 8
bits,
the truncation from any wider representation preserves the correct signed
value due to two's complement representation.
## Why ShortType Needs the Safety Check
ShortType columns may be problematic because:
1. **Ambiguous origin**: ShortType can come from either signed INT16 (safe)
or
unsigned UINT_8 (potentially incompatible).
2. **Different reader behavior**: Arrow-based readers like DataFusion
respect
the unsigned UINT_8 logical type and read data as unsigned, while Spark
ignores the logical type and reads as signed. This can produce different
results for values 128-255.
3. **No metadata available**: At scan time, Comet cannot determine whether a
ShortType column originated from INT16 or UINT_8, so the safety check
conservatively falls back to Spark for all ShortType columns.
Users who know their data does not contain unsigned UINT_8 columns can
disable
the safety check with `spark.comet.scan.unsignedSmallIntSafetyCheck=false`.
Co-Authored-By: Claude Opus 4.5 <[email protected]>
* format
* rename
* rename
* Fix clippy warnings for Rust 1.93
- Use local `root_op` variable instead of unwrapping `exec_context.root_op`
- Replace `is_some()` + `unwrap()` pattern with `if let Some(...)`
Co-Authored-By: Claude Opus 4.5 <[email protected]>
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
common/src/main/scala/org/apache/comet/CometConf.scala | 15 ++++++++++-----
docs/source/contributor-guide/parquet_scans.md | 13 +++++++------
.../main/scala/org/apache/comet/rules/CometScanRule.scala | 10 ++++++----
.../test/scala/org/apache/comet/CometFuzzTestBase.scala | 2 +-
.../apache/comet/parquet/CometParquetWriterSuite.scala | 4 ++--
.../scala/org/apache/comet/parquet/ParquetReadSuite.scala | 2 +-
.../scala/org/apache/comet/rules/CometScanRuleSuite.scala | 14 +++++++-------
.../test/scala/org/apache/spark/sql/CometTestBase.scala | 6 +++---
8 files changed, 37 insertions(+), 29 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index ee1093a88..1c51d608d 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -766,13 +766,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
- val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
- conf("spark.comet.scan.allowIncompatible")
+ val COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK: ConfigEntry[Boolean] =
+ conf("spark.comet.scan.unsignedSmallIntSafetyCheck")
.category(CATEGORY_SCAN)
- .doc("Some Comet scan implementations are not currently fully compatible
with Spark for " +
- s"all datatypes. Set this config to true to allow them anyway.
$COMPAT_GUIDE.")
+ .doc(
+ "Parquet files may contain unsigned 8-bit integers (UINT_8) which
Spark maps to " +
+ "ShortType. When this config is true (default), Comet falls back to
Spark for " +
+ "ShortType columns because we cannot distinguish signed INT16 (safe)
from unsigned " +
+ "UINT_8 (may produce different results). Set to false to allow
native execution of " +
+ "ShortType columns if you know your data does not contain unsigned
UINT_8 columns " +
+ s"from improperly encoded Parquet files. $COMPAT_GUIDE.")
.booleanConf
- .createWithDefault(false)
+ .createWithDefault(true)
val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
conf("spark.comet.exec.strictFloatingPoint")
diff --git a/docs/source/contributor-guide/parquet_scans.md
b/docs/source/contributor-guide/parquet_scans.md
index 48fda7dc3..3dcb78e87 100644
--- a/docs/source/contributor-guide/parquet_scans.md
+++ b/docs/source/contributor-guide/parquet_scans.md
@@ -42,12 +42,13 @@ implementation:
The `native_datafusion` and `native_iceberg_compat` scans share the following
limitations:
-- When reading Parquet files written by systems other than Spark that contain
columns with the logical types `UINT_8`
- or `UINT_16`, Comet will produce different results than Spark because Spark
does not preserve or understand these
- logical types. Arrow-based readers, such as DataFusion and Comet do respect
these types and read the data as unsigned
- rather than signed. By default, Comet will fall back to Spark's native scan
when scanning Parquet files containing
- `byte` or `short` types (regardless of the logical type). This behavior can
be disabled by setting
- `spark.comet.scan.allowIncompatible=true`.
+- When reading Parquet files written by systems other than Spark that contain
columns with the logical type `UINT_8`
+ (unsigned 8-bit integers), Comet may produce different results than Spark.
Spark maps `UINT_8` to `ShortType`, but
+ Comet's Arrow-based readers respect the unsigned type and read the data as
unsigned rather than signed. Since Comet
+ cannot distinguish `ShortType` columns that came from `UINT_8` versus signed
`INT16`, by default Comet falls back to
+ Spark when scanning Parquet files containing `ShortType` columns. This
behavior can be disabled by setting
+ `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType`
columns are always safe because they can
+ only come from signed `INT8`, where truncation preserves the signed value.
- No support for default values that are nested types (e.g., maps, arrays,
structs). Literal default values are supported.
The `native_datafusion` scan has some additional limitations:
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index bfcf25074..4291e3fb6 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -721,11 +721,13 @@ case class CometScanTypeChecker(scanImpl: String) extends
DataTypeSupport with C
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
- case ByteType | ShortType
+ case ShortType
if scanImpl != CometConf.SCAN_NATIVE_COMET &&
- !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
- fallbackReasons += s"$scanImpl scan cannot read $dt when " +
- s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false.
${CometConf.COMPAT_GUIDE}."
+ CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
+ fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8
correctly for $dt. " +
+ s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false
to allow " +
+ "native execution if your data does not contain unsigned small
integers. " +
+ CometConf.COMPAT_GUIDE
false
case _: StructType | _: ArrayType | _: MapType if scanImpl ==
CometConf.SCAN_NATIVE_COMET =>
false
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
index 74858ed61..5c5251b5e 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
@@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with
AdaptiveSparkPlanHelper {
super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags:
_*) {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl,
- CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
+ CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
testFun
}
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
index c4856c3cc..1d63b9d3a 100644
---
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
+++
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
@@ -459,8 +459,8 @@ class CometParquetWriterSuite extends CometTestBase {
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
// explicitly set scan impl to override CI defaults
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto",
- // COMET_SCAN_ALLOW_INCOMPATIBLE is needed because input data contains
byte/short types
- CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
+ // Disable unsigned small int safety check for ShortType columns
+ CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
// use a different timezone to make sure that timezone handling works
with nested types
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") {
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index b028a70dc..a05bb7c39 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with
AdaptiveSparkPlanHelper {
val rows = 1000
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
- CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
+ CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
makeParquetFileAllPrimitiveTypes(
path,
dictionaryEnabled = false,
diff --git
a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
index 7f54d0b7c..d0dfbbb09 100644
--- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
@@ -140,21 +140,21 @@ class CometScanRuleSuite extends CometTestBase {
}
}
- test("CometScanRule should fallback to Spark for unsupported data types in
v1 scan") {
+ test("CometScanRule should fallback to Spark for ShortType when safety check
enabled") {
withTempPath { path =>
- // Create test data with unsupported types (e.g., BinaryType,
CalendarIntervalType)
+ // Create test data with ShortType which may be from unsigned UINT_8
import org.apache.spark.sql.types._
val unsupportedSchema = new StructType(
Array(
StructField("id", DataTypes.IntegerType, nullable = true),
StructField(
"value",
- DataTypes.ByteType,
+ DataTypes.ShortType,
nullable = true
- ), // Unsupported in some scan modes
+ ), // May be from unsigned UINT_8
StructField("name", DataTypes.StringType, nullable = true)))
- val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2"))
+ val testData = Seq(Row(1, 1.toShort, "test1"), Row(2, -1.toShort,
"test2"))
val df = spark.createDataFrame(spark.sparkContext.parallelize(testData),
unsupportedSchema)
df.write.parquet(path.toString)
@@ -167,10 +167,10 @@ class CometScanRuleSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
- CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
+ CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
val transformedPlan = applyCometScanRule(sparkPlan)
- // Should fallback to Spark due to unsupported ByteType in schema
+ // Should fallback to Spark due to ShortType (may be from unsigned
UINT_8)
assert(countOperators(transformedPlan, classOf[FileSourceScanExec])
== 1)
assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0)
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 81ac72247..89249240c 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -83,7 +83,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
- conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
+ conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key,
"true")
// SortOrder is incompatible for mixed zero and negative zero floating
point values, but
@@ -1113,7 +1113,7 @@ abstract class CometTestBase
* |""".stripMargin,
* "select arr from tbl",
* sqlConf = Seq(
- * CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false",
+ * CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key ->
"true",
* "spark.comet.explainFallback.enabled" -> "false"
* ),
* debugCometDF = df => {
@@ -1275,6 +1275,6 @@ abstract class CometTestBase
def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
usingDataSourceExec(conf) &&
- !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
+ CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]