andygrove opened a new issue, #4297:
URL: https://github.com/apache/datafusion-comet/issues/4297

   ## Description
   
   `native_datafusion` silently accepts a number of primitive-to-primitive 
Parquet conversions that Spark's vectorized reader rejects on every supported 
version (Spark's `ParquetVectorUpdaterFactory.getUpdater` falls through to 
`constructConvertNotSupportedException`). The result is wrong answers (silent 
overflow on narrowing, silent precision loss on widening, raw 
int-as-epoch-seconds reinterpretation for `int -> timestamp`) rather than the 
`SchemaColumnConvertNotSupportedException` Spark would throw.
   
   This is the same class of gap as #3720 (which focuses on `STRING -> INT` and 
decimal precision narrowing), but for primitive-to-primitive numeric and 
date/timestamp conversions.
   
   ## Context
   
   #4229 closes the three widening cases gated by 
`spark.comet.schemaEvolution.enabled` (`INT32 -> INT64`, `FLOAT -> DOUBLE`, 
`INT32 -> DOUBLE`) by mirroring the Comet-specific branches in 
`TypeUtil.checkParquetType`. The cases listed below are *not* gated by that 
flag in Spark — they throw unconditionally on the JVM vectorized reader — so 
they remain silently wrong under both `schemaEvolution.enabled=true` and 
`false` after #4229 lands.
   
   From Spark 4.0.2's `ParquetTypeWideningSuite` `expectError = true` list: 
`Long -> Int`, `Double -> Float`, `Float -> Long`, `Long -> Double`, `Int -> 
Float`, `Int -> TimestampType`, `Date -> TimestampType`.
   
   ## Probe results on this PR (Spark 3.5, 
`COMET_NATIVE_SCAN_IMPL=native_datafusion`)
   
   Credit: probe authored by @mbutrovich in 
https://github.com/apache/datafusion-comet/pull/4229#issuecomment-4391527687.
   
   | Case | Written | Spark ref behavior | schemaEvolution=false | 
schemaEvolution=true |
   |---|---|---|---|---|
   | `long -> int` (narrowing) | `[1, 2, 3, 2147483652]` | throws | `[1, 2, 3, 
-2147483644]` | `[1, 2, 3, -2147483644]` |
   | `double -> float` (narrowing) | `[1.5, 2.5, 1e40]` | throws | `[1.5, 2.5, 
Infinity]` | `[1.5, 2.5, Infinity]` |
   | `float -> long` | `[1.5, 2.5]` | throws | `[1, 2]` (truncated) | `[1, 2]` 
(truncated) |
   | `long -> double` | `[1, 2, 2^54+1]` | throws | `[1.0, 2.0, 
1.8014398509481984E16]` (lost +1) | same |
   | `int -> float` | `[1, 2, 2^25+1]` | throws | `[1.0, 2.0, 3.3554432E7]` 
(lost +1) | same |
   | `int -> timestamp` | `[1, 2, 3]` | throws | `[1969-12-31 16:00:01 ... 03]` 
(PST, int-as-seconds) | same |
   | `double -> long` | `[1.0, 2.0, 3.0]` | throws | `[1, 2, 3]` | `[1, 2, 3]` |
   
   ## Suggested approach
   
   Per the PR discussion, the cleanest fix is to invert the check in 
`replace_with_spark_cast` to an allowlist of Spark-supported `(physical, 
target)` pairs (mirroring the accept cases in Spark's 
`ParquetVectorUpdaterFactory.getUpdater` per Spark version), so anything else 
raises `SparkError::ParquetSchemaConvert`. This closes the whole category at 
once and avoids continuing to enumerate denylist cases.
   
   ## Probe used
   
   ```scala
   package org.apache.comet.parquet
   
   import scala.util.Try
   import org.apache.spark.sql.{CometTestBase, DataFrame}
   import org.apache.spark.sql.internal.SQLConf
   import org.apache.comet.CometConf
   
   class TypePromotionProbeSuite extends CometTestBase {
     import testImplicits._
   
     private def probe(label: String)(body: => Any): Unit = {
       val result = Try(body)
       // scalastyle:off println
       println(s"[PROBE] $label -> ${result match {
           case scala.util.Success(v) => s"OK value=$v"
           case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
         }}")
       // scalastyle:on println
     }
   
     private def runAll(ev: Boolean): Unit = withSQLConf(
       CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
       CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
       SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
       def run(label: String, df: DataFrame, writeType: String, readAs: 
String): Unit =
         probe(s"$label (ev=$ev)") {
           withTempPath { dir =>
             df.selectExpr(s"cast(c as $writeType) as 
c").write.parquet(dir.getCanonicalPath)
             spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
               .collect().map(_.get(0)).toSeq
           }
         }
       run("int->long",              Seq(1, 2, 3).toDF("c"),                    
          "int",    "bigint")
       run("float->double",          Seq(1.0f, 2.0f, 3.0f).toDF("c"),           
          "float",  "double")
       run("int->double",            Seq(1, 2, 3).toDF("c"),                    
          "int",    "double")
       run("long->int narrowing",    Seq(1L, 2L, 3L, Int.MaxValue.toLong + 
5L).toDF("c"), "bigint", "int")
       run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"),             
          "double", "float")
       run("float->long",            Seq(1.5f, 2.5f).toDF("c"),                 
          "float",  "bigint")
       run("long->double",           Seq(1L, 2L, (1L << 54) + 1L).toDF("c"),    
          "bigint", "double")
       run("int->float",             Seq(1, 2, (1 << 25) + 1).toDF("c"),        
          "int",    "float")
       run("int->timestamp",         Seq(1, 2, 3).toDF("c"),                    
          "int",    "timestamp")
       run("double->long",           Seq(1.0, 2.0, 3.0).toDF("c"),              
          "double", "bigint")
     }
   
     test("probe ev=false") { runAll(ev = false) }
     test("probe ev=true")  { runAll(ev = true) }
   }
   ```
   
   ## Related
   
   - #3720 (parent: native_datafusion silent schema-mismatch acceptance)
   - #4229 (closes the three `schemaEvolution`-gated widening cases)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to