This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 2665b526b feat: Support Type widening: byte → short/int/long, short → int/long (#1770) 2665b526b is described below commit 2665b526b3b3bfa6e47b0b77fc154f755d22d564 Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Tue May 27 18:20:16 2025 -0700 feat: Support Type widening: byte → short/int/long, short → int/long (#1770) * Support type widening for Spark 4.0 * formatting * add one more test * formatting --- native/core/src/parquet/read/column.rs | 14 ++++ .../apache/comet/parquet/ParquetReadSuite.scala | 82 ++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs index dc7797a2b..43b9d5ada 100644 --- a/native/core/src/parquet/read/column.rs +++ b/native/core/src/parquet/read/column.rs @@ -154,6 +154,16 @@ impl ColumnReader { ) } } + // promote byte to short + PhysicalType::INT32 if promotion_info.bit_width == 16 => { + typed_reader!(Int16ColumnReader, Int16) + } + // promote byte to int + PhysicalType::INT32 if promotion_info.bit_width == 32 => { + typed_reader!(Int32ColumnReader, Int32) + } + // promote byte to long + PhysicalType::INT64 => typed_reader!(Int32To64ColumnReader, Int64), _ => typed_reader!(Int8ColumnReader, Int8), }, (8, false) => typed_reader!(UInt8ColumnReader, Int16), @@ -161,6 +171,10 @@ impl ColumnReader { PhysicalType::DOUBLE => { typed_reader!(Int16ToDoubleColumnReader, Float64) } + // promote short to long + PhysicalType::INT64 => { + typed_reader!(Int32To64ColumnReader, Int64) + } PhysicalType::INT32 if promotion_info.bit_width == 32 => { typed_reader!(Int32ColumnReader, Int32) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 1378e6cfb..f08d5b6ab 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1246,6 +1246,88 @@ abstract class ParquetReadSuite extends CometTestBase { } } + test("type widening: byte → short/int/long, short → int/long, int → long") { + withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + val values = 1 to 10 + val options: Map[String, String] = Map.empty[String, String] + + // Input types and corresponding DataFrames + val inputDFs = Seq( + "byte" -> values.map(_.toByte).toDF("col1"), + "short" -> values.map(_.toShort).toDF("col1"), + "int" -> values.map(_.toInt).toDF("col1")) + + // Target Spark read schemas for widening + val widenTargets = Seq( + "short" -> values.map(_.toShort).toDF("col1"), + "int" -> values.map(_.toInt).toDF("col1"), + "long" -> values.map(_.toLong).toDF("col1")) + + for ((inputType, inputDF) <- inputDFs) { + val writePath = s"$path/$inputType" + inputDF.write.format("parquet").options(options).save(writePath) + + for ((targetType, targetDF) <- widenTargets) { + // Only test valid widenings (e.g., don't test int → short) + val wideningValid = (inputType, targetType) match { + case ("byte", "short" | "int" | "long") => true + case ("short", "int" | "long") => true + case ("int", "long") => true + case _ => false + } + + if (wideningValid) { + val reader = spark.read + .schema(s"col1 $targetType") + .format("parquet") + .options(options) + .load(writePath) + + checkAnswer(reader, targetDF) + } + } + } + } + } + } + + test("read byte, int, short, long together") { + withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val byteDF = (Byte.MaxValue - 2 to Byte.MaxValue).map(_.toByte).toDF("col1") + val shortDF = (Short.MaxValue - 2 to Short.MaxValue).map(_.toShort).toDF("col1") + val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1") + val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1") + val unionDF = byteDF.union(shortDF).union(intDF).union(longDF) + + val byteDir = s"$path${File.separator}part=byte" + val shortDir = s"$path${File.separator}part=short" + val intDir = s"$path${File.separator}part=int" + val longDir = s"$path${File.separator}part=long" + + val options: Map[String, String] = Map.empty[String, String] + + byteDF.write.format("parquet").options(options).save(byteDir) + shortDF.write.format("parquet").options(options).save(shortDir) + intDF.write.format("parquet").options(options).save(intDir) + longDF.write.format("parquet").options(options).save(longDir) + + val df = spark.read + .schema(unionDF.schema) + .format("parquet") + .options(options) + .load(path) + .select("col1") + + checkAnswer(df, unionDF) + } + } + } + test("scan metrics") { // https://github.com/apache/datafusion-comet/issues/1441 assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org