This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f6463650f chore: add a test case to read from an arbitrarily complex 
type schema (#1911)
f6463650f is described below

commit f6463650f7b010c32289a8f94c0b073736c37b1b
Author: Parth Chandra <par...@apache.org>
AuthorDate: Thu Jun 19 16:47:56 2025 -0700

    chore: add a test case to read from an arbitrarily complex type schema 
(#1911)
    
    * chore: add a test case to read from an arbitrarily complex type schema
---
 .../apache/comet/parquet/ParquetReadSuite.scala    |  45 +++++++
 .../scala/org/apache/spark/sql/CometTestBase.scala | 129 ++++++++++++++++++++-
 2 files changed, 172 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 49d8f045d..4801678a4 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1951,6 +1951,51 @@ class ParquetReadV1Suite extends ParquetReadSuite with 
AdaptiveSparkPlanHelper {
       }
     }
   }
+
+  test("read basic complex types") {
+    Seq(true, false).foreach(dictionaryEnabled => {
+      withTempPath { dir =>
+        val path = new Path(dir.toURI.toString, "complex_types.parquet")
+        makeParquetFileComplexTypes(path, dictionaryEnabled, 10)
+        withParquetTable(path.toUri.toString, "complex_types") {
+          Seq(CometConf.SCAN_NATIVE_DATAFUSION, 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(
+            scanMode => {
+              withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) {
+                checkSparkAnswerAndOperator(sql("select * from complex_types"))
+                // First level
+                checkSparkAnswerAndOperator(sql(
+                  "select optional_array, array_of_struct, optional_map, 
complex_map from complex_types"))
+                // second nested level
+                checkSparkAnswerAndOperator(
+                  sql(
+                    "select optional_array[0], " +
+                      "array_of_struct[0].field1, " +
+                      "array_of_struct[0].optional_nested_array, " +
+                      "optional_map.key, " +
+                      "optional_map.value, " +
+                      "map_keys(complex_map), " +
+                      "map_values(complex_map) " +
+                      "from complex_types"))
+                // leaf fields
+                checkSparkAnswerAndOperator(
+                  sql(
+                    "select optional_array[0], " +
+                      "array_of_struct[0].field1, " +
+                      "array_of_struct[0].optional_nested_array[0], " +
+                      "optional_map.key, " +
+                      "optional_map.value, " +
+                      "map_keys(complex_map)[0].key_field1, " +
+                      "map_keys(complex_map)[0].key_field2, " +
+                      "map_values(complex_map)[0].value_field1, " +
+                      "map_values(complex_map)[0].value_field2 " +
+                      "from complex_types"))
+              }
+            })
+        }
+      }
+    })
+  }
+
 }
 
 class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper 
{
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 6801c1eea..05c46e307 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -32,9 +32,9 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.example.data.Group
-import org.apache.parquet.example.data.simple.SimpleGroup
+import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
 import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.example.ExampleParquetWriter
+import org.apache.parquet.hadoop.example.{ExampleParquetWriter, 
GroupWriteSupport}
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 import org.apache.spark._
 import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, 
MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
@@ -698,6 +698,131 @@ abstract class CometTestBase
     expected
   }
 
+  // Generate a file based on a complex schema. Schema derived from 
https://arrow.apache.org/blog/2022/10/17/arrow-parquet-encoding-part-3/
+  def makeParquetFileComplexTypes(
+      path: Path,
+      dictionaryEnabled: Boolean,
+      numRows: Integer = 10000): Unit = {
+    val schemaString =
+      """
+      message ComplexDataSchema {
+        optional group optional_array (LIST) {
+          repeated group list {
+            optional int32 element;
+          }
+        }
+        required group array_of_struct (LIST) {
+          repeated group list {
+            optional group struct_element {
+              required int32 field1;
+              optional group optional_nested_array (LIST) {
+                repeated group list {
+                  required int32 element;
+                }
+              }
+            }
+          }
+        }
+        optional group optional_map (MAP) {
+          repeated group key_value {
+            required int32 key;
+            optional int32 value;
+          }
+        }
+        required group complex_map (MAP) {
+          repeated group key_value {
+            required group map_key {
+              required int32 key_field1;
+              optional int32 key_field2;
+            }
+            required group map_value {
+              required int32 value_field1;
+              repeated int32 value_field2;
+            }
+          }
+        }
+      }
+    """
+
+    val schema: MessageType = MessageTypeParser.parseMessageType(schemaString)
+    GroupWriteSupport.setSchema(schema, spark.sparkContext.hadoopConfiguration)
+
+    val writer = createParquetWriter(schema, path, dictionaryEnabled)
+
+    val groupFactory = new SimpleGroupFactory(schema)
+
+    for (i <- 0 until numRows) {
+      val record = groupFactory.newGroup()
+
+      // Optional array of optional integers
+      if (i % 2 == 0) { // optional_array for every other row
+        val optionalArray = record.addGroup("optional_array")
+        for (j <- 0 until (i % 5)) {
+          val elementGroup = optionalArray.addGroup("list")
+          if (j % 2 == 0) { // some elements are optional
+            elementGroup.append("element", j)
+          }
+        }
+      }
+
+      // Required array of structs
+      val arrayOfStruct = record.addGroup("array_of_struct")
+      for (j <- 0 until (i % 3) + 1) { // Add one to three elements
+        val structElementGroup = 
arrayOfStruct.addGroup("list").addGroup("struct_element")
+        structElementGroup.append("field1", i * 10 + j)
+
+        // Optional nested array
+        if (j % 2 != 0) { // optional nested array for every other struct
+          val nestedArray = 
structElementGroup.addGroup("optional_nested_array")
+          for (k <- 0 until (i % 4)) { // Add zero to three elements.
+            val nestedElementGroup = nestedArray.addGroup("list")
+            nestedElementGroup.append("element", i + j + k)
+          }
+        }
+      }
+
+      // Optional map
+      if (i % 3 == 0) { // optional_map every third row
+        val optionalMap = record.addGroup("optional_map")
+        optionalMap
+          .addGroup("key_value")
+          .append("key", i)
+          .append("value", i)
+        if (i % 5 == 0) { // another optional entry
+          optionalMap
+            .addGroup("key_value")
+            .append("key", i)
+          // Value is optional
+          if (i % 10 == 0) {
+            optionalMap
+              .addGroup("key_value")
+              .append("key", i)
+              .append("value", i)
+          }
+        }
+      }
+
+      // Required map with complex key and value types
+      val complexMap = record.addGroup("complex_map")
+      val complexMapKeyVal = complexMap.addGroup("key_value")
+
+      complexMapKeyVal
+        .addGroup("map_key")
+        .append("key_field1", i)
+
+      complexMapKeyVal
+        .addGroup("map_value")
+        .append("value_field1", i)
+        .append("value_field2", i * 100)
+        .append("value_field2", i * 101)
+        .append("value_field2", i * 102)
+
+      writer.write(record)
+    }
+
+    writer.close()
+  }
+
   protected def makeDateTimeWithFormatTable(
       path: Path,
       dictionaryEnabled: Boolean,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to