mbutrovich commented on PR #4229:
URL:
https://github.com/apache/datafusion-comet/pull/4229#issuecomment-4391527687
Nice reduction in ignored tests. One concern on scope.
The three-case match in `replace_with_spark_cast` exactly mirrors the three
Comet-specific permissive branches in Comet's `TypeUtil.checkParquetType`
(`INT32→Long` and `FLOAT→Double` gated on `allowTypePromotion`, `INT32→Double`
gated on `isSpark40Plus`), so `native_datafusion` and `native_iceberg_compat`
now line up for those three. Good.
But `native_datafusion` also silently accepts a bunch of conversions that
**Spark's vectorized reader rejects on every supported version** (Spark's
`ParquetVectorUpdaterFactory.getUpdater` falls through to
`constructConvertNotSupportedException`). From Spark's
`ParquetTypeWideningSuite` `expectError = true` list on 4.0.2: `Long→Int`,
`Double→Float`, `Float→Long`, `Long→Double`, `Int→Float`, `Int→TimestampType`,
`Date→TimestampType`. None of these is gated by the new flag, and none of them
throws today.
I ran a probe against this PR on Spark 3.5 with
`COMET_NATIVE_SCAN_IMPL=native_datafusion`, sweeping
`spark.comet.schemaEvolution.enabled` on and off. Results:
| Case | Written | Spark ref behavior | schemaEvolution=false |
schemaEvolution=true |
|---|---|---|---|---|
| `int→long` | `[1, 2, 3]` | throws (3.x) / ok (4.0) | **throws** ✅ | `[1,
2, 3]` |
| `float→double` | `[1.0, 2.0, 3.0]` | throws (3.x) / ok (4.0) | **throws**
✅ | `[1.0, 2.0, 3.0]` |
| `int→double` | `[1, 2, 3]` | throws (3.x) / ok (4.0) | **throws** ✅ |
`[1.0, 2.0, 3.0]` |
| `long→int` (narrowing) | `[1, 2, 3, 2147483652]` | throws | `[1, 2, 3,
-2147483644]` | `[1, 2, 3, -2147483644]` |
| `double→float` (narrowing) | `[1.5, 2.5, 1e40]` | throws | `[1.5, 2.5,
Infinity]` | `[1.5, 2.5, Infinity]` |
| `float→long` | `[1.5, 2.5]` | throws | `[1, 2]` (truncated) | `[1, 2]`
(truncated) |
| `long→double` | `[1, 2, 2^54+1]` | throws | `[1.0, 2.0,
1.8014398509481984E16]` (lost +1) | same |
| `int→float` | `[1, 2, 2^25+1]` | throws | `[1.0, 2.0, 3.3554432E7]` (lost
+1) | same |
| `int→timestamp` | `[1, 2, 3]` | throws | `[1969-12-31 16:00:01 … 03]`
(PST, int-as-seconds) | same |
| `double→long` | `[1.0, 2.0, 3.0]` | throws | `[1, 2, 3]` | `[1, 2, 3]` |
The top three rows are what the PR fixes and look right under both settings.
The bottom seven are wrong-answer paths under both settings: silent overflow on
narrowing, silent precision loss on widening Spark doesn't allow, silent
raw-int-as-epoch-seconds reinterpretation for `int→timestamp`. These are the
same class of gap #3720 enumerates for `STRING→INT` and decimal precision
narrowing, just for primitive-to-primitive conversions.
Not asking you to fix all of them in this PR. But I think the framing in the
commit message and code comment (`mirrors TypeUtil.checkParquetType`)
undersells the remaining surface. Two options worth considering:
1. Invert the check to an allowlist of Spark-supported `(physical, target)`
pairs (essentially mirror the accept cases in Spark's
`ParquetVectorUpdaterFactory.getUpdater` per Spark version), so anything else
raises `ParquetSchemaConvert`. This closes the whole category.
2. Land this as-is and file a followup issue tracking the seven cases above,
linking this probe so behavior is captured.
Either is fine by me. I'd lean toward (2) to keep this PR scoped.
---
Probe used (slimmed, put under
`spark/src/test/scala/org/apache/comet/parquet/`, runs with `./mvnw test
-Pspark-3.5 -Dtest=none
-Dsuites=org.apache.comet.parquet.TypePromotionProbeSuite
-Dscalastyle.skip=true`):
```scala
package org.apache.comet.parquet
import scala.util.Try
import org.apache.spark.sql.{CometTestBase, DataFrame}
import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.CometConf
class TypePromotionProbeSuite extends CometTestBase {
import testImplicits._
private def probe(label: String)(body: => Any): Unit = {
val result = Try(body)
// scalastyle:off println
println(s"[PROBE] $label -> ${result match {
case scala.util.Success(v) => s"OK value=$v"
case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
}}")
// scalastyle:on println
}
private def runAll(ev: Boolean): Unit = withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
def run(label: String, df: DataFrame, writeType: String, readAs:
String): Unit =
probe(s"$label (ev=$ev)") {
withTempPath { dir =>
df.selectExpr(s"cast(c as $writeType) as
c").write.parquet(dir.getCanonicalPath)
spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
.collect().map(_.get(0)).toSeq
}
}
run("int->long", Seq(1, 2, 3).toDF("c"),
"int", "bigint")
run("float->double", Seq(1.0f, 2.0f, 3.0f).toDF("c"),
"float", "double")
run("int->double", Seq(1, 2, 3).toDF("c"),
"int", "double")
run("long->int narrowing", Seq(1L, 2L, 3L, Int.MaxValue.toLong +
5L).toDF("c"), "bigint", "int")
run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"),
"double", "float")
run("float->long", Seq(1.5f, 2.5f).toDF("c"),
"float", "bigint")
run("long->double", Seq(1L, 2L, (1L << 54) + 1L).toDF("c"),
"bigint", "double")
run("int->float", Seq(1, 2, (1 << 25) + 1).toDF("c"),
"int", "float")
run("int->timestamp", Seq(1, 2, 3).toDF("c"),
"int", "timestamp")
run("double->long", Seq(1.0, 2.0, 3.0).toDF("c"),
"double", "bigint")
}
test("probe ev=false") { runAll(ev = false) }
test("probe ev=true") { runAll(ev = true) }
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]