This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2665b526b feat: Support Type widening: byte → short/int/long, short → 
int/long (#1770)
2665b526b is described below

commit 2665b526b3b3bfa6e47b0b77fc154f755d22d564
Author: Huaxin Gao <huaxin_...@apple.com>
AuthorDate: Tue May 27 18:20:16 2025 -0700

    feat: Support Type widening: byte → short/int/long, short → int/long (#1770)
    
    * Support type widening for Spark 4.0
    
    * formatting
    
    * add one more test
    
    * formatting
---
 native/core/src/parquet/read/column.rs             | 14 ++++
 .../apache/comet/parquet/ParquetReadSuite.scala    | 82 ++++++++++++++++++++++
 2 files changed, 96 insertions(+)

diff --git a/native/core/src/parquet/read/column.rs 
b/native/core/src/parquet/read/column.rs
index dc7797a2b..43b9d5ada 100644
--- a/native/core/src/parquet/read/column.rs
+++ b/native/core/src/parquet/read/column.rs
@@ -154,6 +154,16 @@ impl ColumnReader {
                                         )
                                     }
                                 }
+                                // promote byte to short
+                                PhysicalType::INT32 if 
promotion_info.bit_width == 16 => {
+                                    typed_reader!(Int16ColumnReader, Int16)
+                                }
+                                // promote byte to int
+                                PhysicalType::INT32 if 
promotion_info.bit_width == 32 => {
+                                    typed_reader!(Int32ColumnReader, Int32)
+                                }
+                                // promote byte to long
+                                PhysicalType::INT64 => 
typed_reader!(Int32To64ColumnReader, Int64),
                                 _ => typed_reader!(Int8ColumnReader, Int8),
                             },
                             (8, false) => typed_reader!(UInt8ColumnReader, 
Int16),
@@ -161,6 +171,10 @@ impl ColumnReader {
                                 PhysicalType::DOUBLE => {
                                     typed_reader!(Int16ToDoubleColumnReader, 
Float64)
                                 }
+                                // promote short to long
+                                PhysicalType::INT64 => {
+                                    typed_reader!(Int32To64ColumnReader, Int64)
+                                }
                                 PhysicalType::INT32 if 
promotion_info.bit_width == 32 => {
                                     typed_reader!(Int32ColumnReader, Int32)
                                 }
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 1378e6cfb..f08d5b6ab 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1246,6 +1246,88 @@ abstract class ParquetReadSuite extends CometTestBase {
     }
   }
 
+  test("type widening: byte → short/int/long, short → int/long, int → long") {
+    withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        val values = 1 to 10
+        val options: Map[String, String] = Map.empty[String, String]
+
+        // Input types and corresponding DataFrames
+        val inputDFs = Seq(
+          "byte" -> values.map(_.toByte).toDF("col1"),
+          "short" -> values.map(_.toShort).toDF("col1"),
+          "int" -> values.map(_.toInt).toDF("col1"))
+
+        // Target Spark read schemas for widening
+        val widenTargets = Seq(
+          "short" -> values.map(_.toShort).toDF("col1"),
+          "int" -> values.map(_.toInt).toDF("col1"),
+          "long" -> values.map(_.toLong).toDF("col1"))
+
+        for ((inputType, inputDF) <- inputDFs) {
+          val writePath = s"$path/$inputType"
+          inputDF.write.format("parquet").options(options).save(writePath)
+
+          for ((targetType, targetDF) <- widenTargets) {
+            // Only test valid widenings (e.g., don't test int → short)
+            val wideningValid = (inputType, targetType) match {
+              case ("byte", "short" | "int" | "long") => true
+              case ("short", "int" | "long") => true
+              case ("int", "long") => true
+              case _ => false
+            }
+
+            if (wideningValid) {
+              val reader = spark.read
+                .schema(s"col1 $targetType")
+                .format("parquet")
+                .options(options)
+                .load(writePath)
+
+              checkAnswer(reader, targetDF)
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("read byte, int, short, long together") {
+    withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+
+        val byteDF = (Byte.MaxValue - 2 to 
Byte.MaxValue).map(_.toByte).toDF("col1")
+        val shortDF = (Short.MaxValue - 2 to 
Short.MaxValue).map(_.toShort).toDF("col1")
+        val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1")
+        val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1")
+        val unionDF = byteDF.union(shortDF).union(intDF).union(longDF)
+
+        val byteDir = s"$path${File.separator}part=byte"
+        val shortDir = s"$path${File.separator}part=short"
+        val intDir = s"$path${File.separator}part=int"
+        val longDir = s"$path${File.separator}part=long"
+
+        val options: Map[String, String] = Map.empty[String, String]
+
+        byteDF.write.format("parquet").options(options).save(byteDir)
+        shortDF.write.format("parquet").options(options).save(shortDir)
+        intDF.write.format("parquet").options(options).save(intDir)
+        longDF.write.format("parquet").options(options).save(longDir)
+
+        val df = spark.read
+          .schema(unionDF.schema)
+          .format("parquet")
+          .options(options)
+          .load(path)
+          .select("col1")
+
+        checkAnswer(df, unionDF)
+      }
+    }
+  }
+
   test("scan metrics") {
     // https://github.com/apache/datafusion-comet/issues/1441
     assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to