This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new f6463650f chore: add a test case to read from an arbitrarily complex type schema (#1911) f6463650f is described below commit f6463650f7b010c32289a8f94c0b073736c37b1b Author: Parth Chandra <par...@apache.org> AuthorDate: Thu Jun 19 16:47:56 2025 -0700 chore: add a test case to read from an arbitrarily complex type schema (#1911) * chore: add a test case to read from an arbitrarily complex type schema --- .../apache/comet/parquet/ParquetReadSuite.scala | 45 +++++++ .../scala/org/apache/spark/sql/CometTestBase.scala | 129 ++++++++++++++++++++- 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 49d8f045d..4801678a4 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1951,6 +1951,51 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { } } } + + test("read basic complex types") { + Seq(true, false).foreach(dictionaryEnabled => { + withTempPath { dir => + val path = new Path(dir.toURI.toString, "complex_types.parquet") + makeParquetFileComplexTypes(path, dictionaryEnabled, 10) + withParquetTable(path.toUri.toString, "complex_types") { + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach( + scanMode => { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + checkSparkAnswerAndOperator(sql("select * from complex_types")) + // First level + checkSparkAnswerAndOperator(sql( + "select optional_array, array_of_struct, optional_map, complex_map from complex_types")) + // second nested level + checkSparkAnswerAndOperator( + sql( + "select optional_array[0], " + + "array_of_struct[0].field1, " + + "array_of_struct[0].optional_nested_array, " + + "optional_map.key, " + + "optional_map.value, " + + "map_keys(complex_map), " + + "map_values(complex_map) " + + "from complex_types")) + // leaf fields + checkSparkAnswerAndOperator( + sql( + "select optional_array[0], " + + "array_of_struct[0].field1, " + + "array_of_struct[0].optional_nested_array[0], " + + "optional_map.key, " + + "optional_map.value, " + + "map_keys(complex_map)[0].key_field1, " + + "map_keys(complex_map)[0].key_field2, " + + "map_values(complex_map)[0].value_field1, " + + "map_values(complex_map)[0].value_field2 " + + "from complex_types")) + } + }) + } + } + }) + } + } class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 6801c1eea..05c46e307 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -32,9 +32,9 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.parquet.column.ParquetProperties import org.apache.parquet.example.data.Group -import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory} import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.example.ExampleParquetWriter +import org.apache.parquet.hadoop.example.{ExampleParquetWriter, GroupWriteSupport} import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark._ import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} @@ -698,6 +698,131 @@ abstract class CometTestBase expected } + // Generate a file based on a complex schema. Schema derived from https://arrow.apache.org/blog/2022/10/17/arrow-parquet-encoding-part-3/ + def makeParquetFileComplexTypes( + path: Path, + dictionaryEnabled: Boolean, + numRows: Integer = 10000): Unit = { + val schemaString = + """ + message ComplexDataSchema { + optional group optional_array (LIST) { + repeated group list { + optional int32 element; + } + } + required group array_of_struct (LIST) { + repeated group list { + optional group struct_element { + required int32 field1; + optional group optional_nested_array (LIST) { + repeated group list { + required int32 element; + } + } + } + } + } + optional group optional_map (MAP) { + repeated group key_value { + required int32 key; + optional int32 value; + } + } + required group complex_map (MAP) { + repeated group key_value { + required group map_key { + required int32 key_field1; + optional int32 key_field2; + } + required group map_value { + required int32 value_field1; + repeated int32 value_field2; + } + } + } + } + """ + + val schema: MessageType = MessageTypeParser.parseMessageType(schemaString) + GroupWriteSupport.setSchema(schema, spark.sparkContext.hadoopConfiguration) + + val writer = createParquetWriter(schema, path, dictionaryEnabled) + + val groupFactory = new SimpleGroupFactory(schema) + + for (i <- 0 until numRows) { + val record = groupFactory.newGroup() + + // Optional array of optional integers + if (i % 2 == 0) { // optional_array for every other row + val optionalArray = record.addGroup("optional_array") + for (j <- 0 until (i % 5)) { + val elementGroup = optionalArray.addGroup("list") + if (j % 2 == 0) { // some elements are optional + elementGroup.append("element", j) + } + } + } + + // Required array of structs + val arrayOfStruct = record.addGroup("array_of_struct") + for (j <- 0 until (i % 3) + 1) { // Add one to three elements + val structElementGroup = arrayOfStruct.addGroup("list").addGroup("struct_element") + structElementGroup.append("field1", i * 10 + j) + + // Optional nested array + if (j % 2 != 0) { // optional nested array for every other struct + val nestedArray = structElementGroup.addGroup("optional_nested_array") + for (k <- 0 until (i % 4)) { // Add zero to three elements. + val nestedElementGroup = nestedArray.addGroup("list") + nestedElementGroup.append("element", i + j + k) + } + } + } + + // Optional map + if (i % 3 == 0) { // optional_map every third row + val optionalMap = record.addGroup("optional_map") + optionalMap + .addGroup("key_value") + .append("key", i) + .append("value", i) + if (i % 5 == 0) { // another optional entry + optionalMap + .addGroup("key_value") + .append("key", i) + // Value is optional + if (i % 10 == 0) { + optionalMap + .addGroup("key_value") + .append("key", i) + .append("value", i) + } + } + } + + // Required map with complex key and value types + val complexMap = record.addGroup("complex_map") + val complexMapKeyVal = complexMap.addGroup("key_value") + + complexMapKeyVal + .addGroup("map_key") + .append("key_field1", i) + + complexMapKeyVal + .addGroup("map_value") + .append("value_field1", i) + .append("value_field2", i * 100) + .append("value_field2", i * 101) + .append("value_field2", i * 102) + + writer.write(record) + } + + writer.close() + } + protected def makeDateTimeWithFormatTable( path: Path, dictionaryEnabled: Boolean, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org