This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new a6b340e4b chore: replace legacy datetime rebase tests with current
scan coverage [iceberg] (#3605)
a6b340e4b is described below
commit a6b340e4bc988094aae90767eb9f8dc85f441598
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 2 15:10:04 2026 -0700
chore: replace legacy datetime rebase tests with current scan coverage
[iceberg] (#3605)
---
docs/source/contributor-guide/parquet_scans.md | 8 +-
.../apache/comet/parquet/ParquetReadSuite.scala | 29 ++++
.../sql/comet/ParquetDatetimeRebaseSuite.scala | 164 ---------------------
3 files changed, 33 insertions(+), 168 deletions(-)
diff --git a/docs/source/contributor-guide/parquet_scans.md
b/docs/source/contributor-guide/parquet_scans.md
index 7df939488..c8e960a15 100644
--- a/docs/source/contributor-guide/parquet_scans.md
+++ b/docs/source/contributor-guide/parquet_scans.md
@@ -49,10 +49,10 @@ The following features are not supported by either scan
implementation, and Come
The following shared limitation may produce incorrect results without falling
back to Spark:
-- No support for datetime rebasing detection or the
`spark.comet.exceptionOnDatetimeRebase` configuration. When
- reading Parquet files containing dates or timestamps written before Spark
3.0 (which used a hybrid
- Julian/Gregorian calendar), dates/timestamps will be read as if they were
written using the Proleptic Gregorian
- calendar. This may produce incorrect results for dates before October 15,
1582.
+- No support for datetime rebasing. When reading Parquet files containing
dates or timestamps written before
+ Spark 3.0 (which used a hybrid Julian/Gregorian calendar), dates/timestamps
will be read as if they were
+ written using the Proleptic Gregorian calendar. This may produce incorrect
results for dates before
+ October 15, 1582.
The `native_datafusion` scan has some additional limitations, mostly related
to Parquet metadata. All of these
cause Comet to fall back to Spark.
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 1495eb34e..4a049afbf 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1815,6 +1815,35 @@ class ParquetReadV1Suite extends ParquetReadSuite with
AdaptiveSparkPlanHelper {
})
}
+ test("reading ancient dates before 1582") {
+ // Verify that legacy dates (before 1582-10-15) are read without error.
+ // Comet does not support datetime rebasing, so these dates are read as if
they were
+ // written using the Proleptic Gregorian calendar (no rebase, no
exception).
+ val file =
+
getResourceParquetFilePath("test-data/before_1582_date_v3_2_0.snappy.parquet")
+
+ Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.SCAN_NATIVE_DATAFUSION).foreach {
+ scanImpl =>
+ withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) {
+ val df = spark.read.parquet(file)
+
+ // Verify Comet scan is in the plan
+ val plan = df.queryExecution.executedPlan
+ checkCometOperators(plan)
+
+ // Verify all 8 rows are read and contain dates before 1582
+ val rows = df.collect()
+ assert(rows.length == 8, s"Expected 8 rows with $scanImpl, got
${rows.length}")
+ rows.foreach { row =>
+ val date = row.getDate(0)
+ assert(
+ date.toLocalDate.getYear < 1582,
+ s"Expected date before 1582 with $scanImpl, got $date")
+ }
+ }
+ }
+ }
+
}
// ignored: native_comet scan is no longer supported
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
deleted file mode 100644
index c330bbe4c..000000000
---
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet
-
-import org.scalactic.source.Position
-import org.scalatest.Tag
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
-import org.apache.spark.sql.internal.SQLConf
-
-import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
-
-// This test checks if Comet reads ancient dates & timestamps that are before
1582, as if they are
-// read according to the `LegacyBehaviorPolicy.CORRECTED` mode (i.e., no
rebase) in Spark.
-abstract class ParquetDatetimeRebaseSuite extends CometTestBase {
-
- // This is a flag defined in Spark's
`org.apache.spark.internal.config.Tests` but is only
- // visible under package `spark`.
- val SPARK_TESTING: String = "spark.testing"
-
- // ignored: native_comet scan is no longer supported
- ignore("reading ancient dates before 1582") {
- Seq(true, false).foreach { exceptionOnRebase =>
- withSQLConf(
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
- CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key ->
- exceptionOnRebase.toString) {
- Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion =>
- val file =
- getResourceParquetFilePath(
- s"test-data/before_1582_date_v$sparkVersion.snappy.parquet")
- val df = spark.read.parquet(file)
-
- // Parquet file written by 2.4.5 should throw exception for both
Spark and Comet
- // For Spark 4.0+, Parquet file written by 2.4.5 should not throw
exception
- if ((exceptionOnRebase || sparkVersion == "2_4_5") &&
(!isSpark40Plus || sparkVersion != "2_4_5") &&
- usingLegacyNativeCometScan(conf)) {
- intercept[SparkException](df.collect())
- } else {
- checkSparkNoRebaseAnswer(df)
- }
- }
- }
- }
- }
-
- // ignored: native_comet scan is no longer supported
- ignore("reading ancient timestamps before 1582") {
- assume(usingLegacyNativeCometScan(conf))
- Seq(true, false).foreach { exceptionOnRebase =>
- withSQLConf(
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
- CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key ->
- exceptionOnRebase.toString) {
- Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion =>
- Seq("micros", "millis").foreach { timestampUnit =>
- val file = getResourceParquetFilePath(
-
s"test-data/before_1582_timestamp_${timestampUnit}_v${sparkVersion}.snappy.parquet")
- val df = spark.read.parquet(file)
-
- // Parquet file written by 2.4.5 should throw exception for both
Spark and Comet
- // For Spark 4.0+, Parquet file written by 2.4.5 should not throw
exception
- if ((exceptionOnRebase || sparkVersion == "2_4_5") &&
(!isSpark40Plus || sparkVersion != "2_4_5")
- && usingLegacyNativeCometScan(conf)) {
- intercept[SparkException](df.collect())
- } else {
- checkSparkNoRebaseAnswer(df)
- }
- }
- }
- }
- }
- }
-
- // ignored: native_comet scan is no longer supported
- ignore("reading ancient int96 timestamps before 1582") {
- assume(usingLegacyNativeCometScan(conf))
- Seq(true, false).foreach { exceptionOnRebase =>
- withSQLConf(
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
- CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key ->
- exceptionOnRebase.toString) {
- Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion =>
- Seq("dict", "plain").foreach { parquetEncoding =>
- val file = getResourceParquetFilePath(
-
s"test-data/before_1582_timestamp_int96_${parquetEncoding}_v${sparkVersion}.snappy.parquet")
- val df = spark.read.parquet(file)
-
- // Parquet file written by 2.4.5 should throw exception for both
Spark and Comet
- // For Spark 4.0+, Parquet file written by 2.4.5 should not throw
exception
- if ((exceptionOnRebase || sparkVersion == "2_4_5") &&
(!isSpark40Plus || sparkVersion != "2_4_5")
- && usingLegacyNativeCometScan(conf)) {
- intercept[SparkException](df.collect())
- } else {
- checkSparkNoRebaseAnswer(df)
- }
- }
- }
- }
- }
- }
-
- private def checkSparkNoRebaseAnswer(df: => DataFrame): Unit = {
- var expected: Array[Row] = Array.empty
-
- withSQLConf(CometConf.COMET_ENABLED.key -> "false",
"spark.test.forceNoRebase" -> "true") {
-
- val previousPropertyValue =
Option.apply(System.getProperty(SPARK_TESTING))
- System.setProperty(SPARK_TESTING, "true")
-
- val dfSpark = datasetOfRows(spark, extractLogicalPlan(df))
- expected = dfSpark.collect()
-
- previousPropertyValue match {
- case Some(v) => System.setProperty(SPARK_TESTING, v)
- case None => System.clearProperty(SPARK_TESTING)
- }
- }
-
- val dfComet = datasetOfRows(spark, extractLogicalPlan(df))
- checkAnswer(dfComet, expected)
- }
-}
-
-class ParquetDatetimeRebaseV1Suite extends ParquetDatetimeRebaseSuite {
- override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
- pos: Position): Unit = {
- super.test(testName, testTags:
_*)(withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
- testFun
- })(pos)
- }
-}
-
-// ignored: native_comet scan is no longer supported
-class ParquetDatetimeRebaseV2Suite extends ParquetDatetimeRebaseSuite {
- override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
- pos: Position): Unit = {
- super.ignore(testName, testTags: _*)(
- withSQLConf(
- SQLConf.USE_V1_SOURCE_LIST.key -> "",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
- testFun
- })(pos)
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]