andygrove opened a new issue, #4297: URL: https://github.com/apache/datafusion-comet/issues/4297
## Description `native_datafusion` silently accepts a number of primitive-to-primitive Parquet conversions that Spark's vectorized reader rejects on every supported version (Spark's `ParquetVectorUpdaterFactory.getUpdater` falls through to `constructConvertNotSupportedException`). The result is wrong answers (silent overflow on narrowing, silent precision loss on widening, raw int-as-epoch-seconds reinterpretation for `int -> timestamp`) rather than the `SchemaColumnConvertNotSupportedException` Spark would throw. This is the same class of gap as #3720 (which focuses on `STRING -> INT` and decimal precision narrowing), but for primitive-to-primitive numeric and date/timestamp conversions. ## Context #4229 closes the three widening cases gated by `spark.comet.schemaEvolution.enabled` (`INT32 -> INT64`, `FLOAT -> DOUBLE`, `INT32 -> DOUBLE`) by mirroring the Comet-specific branches in `TypeUtil.checkParquetType`. The cases listed below are *not* gated by that flag in Spark — they throw unconditionally on the JVM vectorized reader — so they remain silently wrong under both `schemaEvolution.enabled=true` and `false` after #4229 lands. From Spark 4.0.2's `ParquetTypeWideningSuite` `expectError = true` list: `Long -> Int`, `Double -> Float`, `Float -> Long`, `Long -> Double`, `Int -> Float`, `Int -> TimestampType`, `Date -> TimestampType`. ## Probe results on this PR (Spark 3.5, `COMET_NATIVE_SCAN_IMPL=native_datafusion`) Credit: probe authored by @mbutrovich in https://github.com/apache/datafusion-comet/pull/4229#issuecomment-4391527687. | Case | Written | Spark ref behavior | schemaEvolution=false | schemaEvolution=true | |---|---|---|---|---| | `long -> int` (narrowing) | `[1, 2, 3, 2147483652]` | throws | `[1, 2, 3, -2147483644]` | `[1, 2, 3, -2147483644]` | | `double -> float` (narrowing) | `[1.5, 2.5, 1e40]` | throws | `[1.5, 2.5, Infinity]` | `[1.5, 2.5, Infinity]` | | `float -> long` | `[1.5, 2.5]` | throws | `[1, 2]` (truncated) | `[1, 2]` (truncated) | | `long -> double` | `[1, 2, 2^54+1]` | throws | `[1.0, 2.0, 1.8014398509481984E16]` (lost +1) | same | | `int -> float` | `[1, 2, 2^25+1]` | throws | `[1.0, 2.0, 3.3554432E7]` (lost +1) | same | | `int -> timestamp` | `[1, 2, 3]` | throws | `[1969-12-31 16:00:01 ... 03]` (PST, int-as-seconds) | same | | `double -> long` | `[1.0, 2.0, 3.0]` | throws | `[1, 2, 3]` | `[1, 2, 3]` | ## Suggested approach Per the PR discussion, the cleanest fix is to invert the check in `replace_with_spark_cast` to an allowlist of Spark-supported `(physical, target)` pairs (mirroring the accept cases in Spark's `ParquetVectorUpdaterFactory.getUpdater` per Spark version), so anything else raises `SparkError::ParquetSchemaConvert`. This closes the whole category at once and avoids continuing to enumerate denylist cases. ## Probe used ```scala package org.apache.comet.parquet import scala.util.Try import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf class TypePromotionProbeSuite extends CometTestBase { import testImplicits._ private def probe(label: String)(body: => Any): Unit = { val result = Try(body) // scalastyle:off println println(s"[PROBE] $label -> ${result match { case scala.util.Success(v) => s"OK value=$v" case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}" }}") // scalastyle:on println } private def runAll(ev: Boolean): Unit = withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString, SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { def run(label: String, df: DataFrame, writeType: String, readAs: String): Unit = probe(s"$label (ev=$ev)") { withTempPath { dir => df.selectExpr(s"cast(c as $writeType) as c").write.parquet(dir.getCanonicalPath) spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath) .collect().map(_.get(0)).toSeq } } run("int->long", Seq(1, 2, 3).toDF("c"), "int", "bigint") run("float->double", Seq(1.0f, 2.0f, 3.0f).toDF("c"), "float", "double") run("int->double", Seq(1, 2, 3).toDF("c"), "int", "double") run("long->int narrowing", Seq(1L, 2L, 3L, Int.MaxValue.toLong + 5L).toDF("c"), "bigint", "int") run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"), "double", "float") run("float->long", Seq(1.5f, 2.5f).toDF("c"), "float", "bigint") run("long->double", Seq(1L, 2L, (1L << 54) + 1L).toDF("c"), "bigint", "double") run("int->float", Seq(1, 2, (1 << 25) + 1).toDF("c"), "int", "float") run("int->timestamp", Seq(1, 2, 3).toDF("c"), "int", "timestamp") run("double->long", Seq(1.0, 2.0, 3.0).toDF("c"), "double", "bigint") } test("probe ev=false") { runAll(ev = false) } test("probe ev=true") { runAll(ev = true) } } ``` ## Related - #3720 (parent: native_datafusion silent schema-mismatch acceptance) - #4229 (closes the three `schemaEvolution`-gated widening cases) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
