voonhous commented on issue #17744:
URL: https://github.com/apache/hudi/issues/17744#issuecomment-3710064394

   As instructed, I've tested reading parquet-tables written with `Spark-4.0`, 
then read them. Below are the source files of what i did:
   
   # Start Spark 4.0 env using docker-compose
   
   ```yml
   services:
     spark-master:
       image: apache/spark:4.0.1
       container_name: spark-master
       ports:
         - "8080:8080"
         - "7077:7077"
       # This command keeps the master running
       command: ["/opt/spark/bin/spark-class", 
"org.apache.spark.deploy.master.Master"]
       volumes:
         - ./data:/tmp/delta-data
   
     spark-worker:
       image: apache/spark:4.0.1
       container_name: spark-worker
       depends_on:
         - spark-master
       # This command tells the worker where the master is
       command: ["/opt/spark/bin/spark-class", 
"org.apache.spark.deploy.worker.Worker", "spark://spark-master:7077"]
       volumes:
         - ./data:/tmp/delta-data
   ```
   
   Pull up the environment
   ```shell
   docker-compose -f docker-compose-spark40.yml up -d
   ```
   
   Enter pyspark
   ```shell
   docker exec -it spark-master /opt/spark/bin/pyspark \
   --conf "spark.driver.extraJavaOptions=-Divy.home=/tmp/.ivy -Duser.home=/tmp" 
\
   --conf "spark.sql.parquet.v2.shredding.enabled=true"
   ```
   
   Create the tables:
   
   ```python
   from pyspark.sql.functions import parse_json, col, variant_get
   
   # Sample Data
   data = [
       (1, '{"user": "Alice", "meta": {"location": "NYC", "age": 30}}'),
       (2, '{"user": "Bob", "meta": {"location": "SF", "age": 25, "job": 
"Eng"}}')
   ]
   df = spark.createDataFrame(data, ["id", "json_str"])
   df_variant = df.withColumn("v", parse_json(col("json_str")))
   
   # --- 1. Write Unshredded Parquet ---
   # We disable shredding via a Spark SQL configuration
   spark.conf.set("spark.sql.parquet.v2.shredding.enabled", "false")
   df_variant.write.mode("overwrite").parquet("/tmp/parquet_unshredded")
   
   # --- 2. Write Shredded Parquet ---
   # We enable shredding (this is actually the default in Spark 4.0)
   spark.conf.set("spark.sql.parquet.v2.shredding.enabled", "true")
   # We can also set the max number of sub-columns to shred
   spark.conf.set("spark.sql.parquet.v2.shredding.maxSubcolumnCount", "100")
   df_variant.write.mode("overwrite").parquet("/tmp/parquet_shredded")
   
   
   # Check Unshredded: You will see one 'v' column of type 'variant'
   print("Unshredded Schema:")
   spark.read.parquet("/tmp/parquet_unshredded").printSchema()
   
   # Check Shredded: Even though the schema LOOKS the same, the execution plan
   # for queries will be different because of the underlying sub-columns.
   print("Shredded Schema:")
   spark.read.parquet("/tmp/parquet_shredded").printSchema()
   
   # Query a sub-field
   query = spark.read.parquet("/tmp/parquet_shredded") \
       .select(variant_get(col("v"), "$.meta.location", 
"string").alias("location"))
   
   query.show()
   
   # Look for 'V2Scan' and 'PartitionFilters' or 'PushFilters'
   query.explain(True)
   ```
   
   ## Spark-4 output
   
   ### Un-shredded
   
   ```python
   spark.read.parquet("/tmp/parquet_unshredded").printSchema()
   ```
   
   ```shell
   root
    |-- id: long (nullable = true)
    |-- json_str: string (nullable = true)
    |-- v: variant (nullable = true)
   ```
   
   ```python
   query = spark.read.parquet("/tmp/parquet_unshredded") \
       .select(variant_get(col("v"), "$.meta.location", 
"string").alias("location"))
   query.show()
   ```
   
   ```shell
   +--------+
   |location|
   +--------+
   |      SF|
   |     NYC|
   +--------+
   ```
   
   ### Shredded:
   
   ```python
   spark.read.parquet("/tmp/parquet_shredded").printSchema()
   ```
   
   ```shell
   root
    |-- id: long (nullable = true)
    |-- json_str: string (nullable = true)
    |-- v: variant (nullable = true)
   ```
   
   
   ```python
   query = spark.read.parquet("/tmp/parquet_shredded") \
       .select(variant_get(col("v"), "$.meta.location", 
"string").alias("location"))
   query.show()
   ```
   
   ```shell
   +--------+
   |location|
   +--------+
   |      SF|
   |     NYC|
   +--------+
   ````
   
   
   # Start Spark3.5 
   
   ```yml
   services:
     spark-35:
       image: apache/spark:3.5.3
       container_name: spark-35
       ports:
         - "8081:8080" # Changed host port to avoid conflict with Spark 4.0
       command: ["/opt/spark/bin/spark-class", 
"org.apache.spark.deploy.master.Master"]
       volumes:
         - ./data:/tmp/delta-data
   ```
   
   Pull up the environment
   ```shell
   docker-compose -f docker-compose-spark35.yml up -d
   ```
   
   Enter pyspark:
   ```
   docker exec -it spark-35 /opt/spark/bin/pyspark
   ```
   
   Note: we could add `--conf "spark.sql.parquet.v2.shredding.enabled=true"`, 
but the config will not have any effect
   
   Read tables that were written:
   ```python
   spark.read.parquet("/tmp/delta-data/parquet_unshredded")
   spark.read.parquet("/tmp/delta-data/parquet_shredded")
   ```
   
   ## Spark-3.5 output
   
   ### Un-shredded
   
   ```python
   df = spark.read.parquet("/tmp/delta-data/parquet_unshredded")
   ```
   
   ```shell
   26/01/05 11:15:58 WARN ParquetFileFormat: Failed to parse and ignored 
serialized Spark schema in Parquet key-value metadata:
        
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
   org.apache.spark.SparkIllegalArgumentException: [UNSUPPORTED_DATATYPE] 
Unsupported data type 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}[1.1]
 failure: 'TimestampType' expected but '{' found
   
   
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
   ^.
        at 
org.apache.spark.sql.errors.DataTypeErrors$.dataTypeUnsupportedError(DataTypeErrors.scala:176)
        at 
org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parseString(LegacyTypeStringParser.scala:91)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:525)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:520)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at scala.util.Failure.recover(Try.scala:234)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.deserializeSchemaString(ParquetFileFormat.scala:520)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$1(ParquetFileFormat.scala:513)
        at scala.Option.flatMap(Option.scala:271)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:513)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:494)
        at scala.collection.immutable.Stream.map(Stream.scala:418)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:494)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
        at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   DataFrame[id: bigint, json_str: string, v: 
struct<value:binary,metadata:binary>]
   ```
   
   ```python
   df.printSchema(false)
   ```
   
   ```shell
   root
    |-- id: long (nullable = true)
    |-- json_str: string (nullable = true)
    |-- v: struct (nullable = true)
    |    |-- value: binary (nullable = true)
    |    |-- metadata: binary (nullable = true)
   ```
   
   ```python
   df.show(truncate=False, vertical=True)
   ```
   
   ```
   -RECORD 
0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id       | 2
    json_str | {"user": "Bob", "meta": {"location": "SF", "age": 25, "job": 
"Eng"}}
    v        | {[02 02 01 00 04 00 16 0D 42 6F 62 02 03 03 04 02 03 05 00 09 09 
53 46 0C 19 0D 45 6E 67], [01 05 00 04 08 10 13 16 75 73 65 72 6D 65 74 61 6C 
6F 63 61 74 69 6F 6E 61 67 65 6A 6F 62]}
   -RECORD 
1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id       | 1
    json_str | {"user": "Alice", "meta": {"location": "NYC", "age": 30}}
    v        | {[02 02 01 00 06 00 13 15 41 6C 69 63 65 02 02 03 02 04 00 06 0D 
4E 59 43 0C 1E], [01 04 00 04 08 10 13 75 73 65 72 6D 65 74 61 6C 6F 63 61 74 
69 6F 6E 61 67 65]}
   ```
   
   ### Shredded:
   
   ```python
   df = spark.read.parquet("/tmp/delta-data/parquet_shredded")
   ```
   
   ```shell
   26/01/05 11:20:07 WARN ParquetFileFormat: Failed to parse and ignored 
serialized Spark schema in Parquet key-value metadata:
        
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
   org.apache.spark.SparkIllegalArgumentException: [UNSUPPORTED_DATATYPE] 
Unsupported data type 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}[1.1]
 failure: 'TimestampType' expected but '{' found
   
   
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
   ^.
        at 
org.apache.spark.sql.errors.DataTypeErrors$.dataTypeUnsupportedError(DataTypeErrors.scala:176)
        at 
org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parseString(LegacyTypeStringParser.scala:91)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:525)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:520)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at scala.util.Failure.recover(Try.scala:234)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.deserializeSchemaString(ParquetFileFormat.scala:520)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$1(ParquetFileFormat.scala:513)
        at scala.Option.flatMap(Option.scala:271)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:513)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:494)
        at scala.collection.immutable.Stream.map(Stream.scala:418)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:494)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
        at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   ```
   
   ```python
   df.printSchema()
   ```
   
   ```shell
   root
    |-- id: long (nullable = true)
    |-- json_str: string (nullable = true)
    |-- v: struct (nullable = true)
    |    |-- value: binary (nullable = true)
    |    |-- metadata: binary (nullable = true)
   ```
   
   ```python
   df.show(truncate=False, vertical=True)
   ```
   
   ```shell
   -RECORD 
0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id       | 2
    json_str | {"user": "Bob", "meta": {"location": "SF", "age": 25, "job": 
"Eng"}}
    v        | {[02 02 01 00 04 00 16 0D 42 6F 62 02 03 03 04 02 03 05 00 09 09 
53 46 0C 19 0D 45 6E 67], [01 05 00 04 08 10 13 16 75 73 65 72 6D 65 74 61 6C 
6F 63 61 74 69 6F 6E 61 67 65 6A 6F 62]}
   -RECORD 
1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id       | 1
    json_str | {"user": "Alice", "meta": {"location": "NYC", "age": 30}}
    v        | {[02 02 01 00 06 00 13 15 41 6C 69 63 65 02 02 03 02 04 00 06 0D 
4E 59 43 0C 1E], [01 04 00 04 08 10 13 75 73 65 72 6D 65 74 61 6C 6F 63 61 74 
69 6F 6E 61 67 65]}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to