voonhous commented on issue #17744:
URL: https://github.com/apache/hudi/issues/17744#issuecomment-3710064394
As instructed, I've tested reading parquet-tables written with `Spark-4.0`,
then read them. Below are the source files of what i did:
# Start Spark 4.0 env using docker-compose
```yml
services:
spark-master:
image: apache/spark:4.0.1
container_name: spark-master
ports:
- "8080:8080"
- "7077:7077"
# This command keeps the master running
command: ["/opt/spark/bin/spark-class",
"org.apache.spark.deploy.master.Master"]
volumes:
- ./data:/tmp/delta-data
spark-worker:
image: apache/spark:4.0.1
container_name: spark-worker
depends_on:
- spark-master
# This command tells the worker where the master is
command: ["/opt/spark/bin/spark-class",
"org.apache.spark.deploy.worker.Worker", "spark://spark-master:7077"]
volumes:
- ./data:/tmp/delta-data
```
Pull up the environment
```shell
docker-compose -f docker-compose-spark40.yml up -d
```
Enter pyspark
```shell
docker exec -it spark-master /opt/spark/bin/pyspark \
--conf "spark.driver.extraJavaOptions=-Divy.home=/tmp/.ivy -Duser.home=/tmp"
\
--conf "spark.sql.parquet.v2.shredding.enabled=true"
```
Create the tables:
```python
from pyspark.sql.functions import parse_json, col, variant_get
# Sample Data
data = [
(1, '{"user": "Alice", "meta": {"location": "NYC", "age": 30}}'),
(2, '{"user": "Bob", "meta": {"location": "SF", "age": 25, "job":
"Eng"}}')
]
df = spark.createDataFrame(data, ["id", "json_str"])
df_variant = df.withColumn("v", parse_json(col("json_str")))
# --- 1. Write Unshredded Parquet ---
# We disable shredding via a Spark SQL configuration
spark.conf.set("spark.sql.parquet.v2.shredding.enabled", "false")
df_variant.write.mode("overwrite").parquet("/tmp/parquet_unshredded")
# --- 2. Write Shredded Parquet ---
# We enable shredding (this is actually the default in Spark 4.0)
spark.conf.set("spark.sql.parquet.v2.shredding.enabled", "true")
# We can also set the max number of sub-columns to shred
spark.conf.set("spark.sql.parquet.v2.shredding.maxSubcolumnCount", "100")
df_variant.write.mode("overwrite").parquet("/tmp/parquet_shredded")
# Check Unshredded: You will see one 'v' column of type 'variant'
print("Unshredded Schema:")
spark.read.parquet("/tmp/parquet_unshredded").printSchema()
# Check Shredded: Even though the schema LOOKS the same, the execution plan
# for queries will be different because of the underlying sub-columns.
print("Shredded Schema:")
spark.read.parquet("/tmp/parquet_shredded").printSchema()
# Query a sub-field
query = spark.read.parquet("/tmp/parquet_shredded") \
.select(variant_get(col("v"), "$.meta.location",
"string").alias("location"))
query.show()
# Look for 'V2Scan' and 'PartitionFilters' or 'PushFilters'
query.explain(True)
```
## Spark-4 output
### Un-shredded
```python
spark.read.parquet("/tmp/parquet_unshredded").printSchema()
```
```shell
root
|-- id: long (nullable = true)
|-- json_str: string (nullable = true)
|-- v: variant (nullable = true)
```
```python
query = spark.read.parquet("/tmp/parquet_unshredded") \
.select(variant_get(col("v"), "$.meta.location",
"string").alias("location"))
query.show()
```
```shell
+--------+
|location|
+--------+
| SF|
| NYC|
+--------+
```
### Shredded:
```python
spark.read.parquet("/tmp/parquet_shredded").printSchema()
```
```shell
root
|-- id: long (nullable = true)
|-- json_str: string (nullable = true)
|-- v: variant (nullable = true)
```
```python
query = spark.read.parquet("/tmp/parquet_shredded") \
.select(variant_get(col("v"), "$.meta.location",
"string").alias("location"))
query.show()
```
```shell
+--------+
|location|
+--------+
| SF|
| NYC|
+--------+
````
# Start Spark3.5
```yml
services:
spark-35:
image: apache/spark:3.5.3
container_name: spark-35
ports:
- "8081:8080" # Changed host port to avoid conflict with Spark 4.0
command: ["/opt/spark/bin/spark-class",
"org.apache.spark.deploy.master.Master"]
volumes:
- ./data:/tmp/delta-data
```
Pull up the environment
```shell
docker-compose -f docker-compose-spark35.yml up -d
```
Enter pyspark:
```
docker exec -it spark-35 /opt/spark/bin/pyspark
```
Note: we could add `--conf "spark.sql.parquet.v2.shredding.enabled=true"`,
but the config will not have any effect
Read tables that were written:
```python
spark.read.parquet("/tmp/delta-data/parquet_unshredded")
spark.read.parquet("/tmp/delta-data/parquet_shredded")
```
## Spark-3.5 output
### Un-shredded
```python
df = spark.read.parquet("/tmp/delta-data/parquet_unshredded")
```
```shell
26/01/05 11:15:58 WARN ParquetFileFormat: Failed to parse and ignored
serialized Spark schema in Parquet key-value metadata:
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
org.apache.spark.SparkIllegalArgumentException: [UNSUPPORTED_DATATYPE]
Unsupported data type
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}[1.1]
failure: 'TimestampType' expected but '{' found
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
^.
at
org.apache.spark.sql.errors.DataTypeErrors$.dataTypeUnsupportedError(DataTypeErrors.scala:176)
at
org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parseString(LegacyTypeStringParser.scala:91)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:525)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:520)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recover(Try.scala:234)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.deserializeSchemaString(ParquetFileFormat.scala:520)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$1(ParquetFileFormat.scala:513)
at scala.Option.flatMap(Option.scala:271)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:513)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:494)
at scala.collection.immutable.Stream.map(Stream.scala:418)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:494)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
at
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
DataFrame[id: bigint, json_str: string, v:
struct<value:binary,metadata:binary>]
```
```python
df.printSchema(false)
```
```shell
root
|-- id: long (nullable = true)
|-- json_str: string (nullable = true)
|-- v: struct (nullable = true)
| |-- value: binary (nullable = true)
| |-- metadata: binary (nullable = true)
```
```python
df.show(truncate=False, vertical=True)
```
```
-RECORD
0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 2
json_str | {"user": "Bob", "meta": {"location": "SF", "age": 25, "job":
"Eng"}}
v | {[02 02 01 00 04 00 16 0D 42 6F 62 02 03 03 04 02 03 05 00 09 09
53 46 0C 19 0D 45 6E 67], [01 05 00 04 08 10 13 16 75 73 65 72 6D 65 74 61 6C
6F 63 61 74 69 6F 6E 61 67 65 6A 6F 62]}
-RECORD
1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 1
json_str | {"user": "Alice", "meta": {"location": "NYC", "age": 30}}
v | {[02 02 01 00 06 00 13 15 41 6C 69 63 65 02 02 03 02 04 00 06 0D
4E 59 43 0C 1E], [01 04 00 04 08 10 13 75 73 65 72 6D 65 74 61 6C 6F 63 61 74
69 6F 6E 61 67 65]}
```
### Shredded:
```python
df = spark.read.parquet("/tmp/delta-data/parquet_shredded")
```
```shell
26/01/05 11:20:07 WARN ParquetFileFormat: Failed to parse and ignored
serialized Spark schema in Parquet key-value metadata:
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
org.apache.spark.SparkIllegalArgumentException: [UNSUPPORTED_DATATYPE]
Unsupported data type
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}[1.1]
failure: 'TimestampType' expected but '{' found
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"json_str","type":"string","nullable":true,"metadata":{}},{"name":"v","type":"variant","nullable":true,"metadata":{}}]}
^.
at
org.apache.spark.sql.errors.DataTypeErrors$.dataTypeUnsupportedError(DataTypeErrors.scala:176)
at
org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parseString(LegacyTypeStringParser.scala:91)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:525)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$deserializeSchemaString$2.applyOrElse(ParquetFileFormat.scala:520)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recover(Try.scala:234)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.deserializeSchemaString(ParquetFileFormat.scala:520)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readSchemaFromFooter$1(ParquetFileFormat.scala:513)
at scala.Option.flatMap(Option.scala:271)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:513)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$2(ParquetFileFormat.scala:494)
at scala.collection.immutable.Stream.map(Stream.scala:418)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:494)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:485)
at
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:80)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
```
```python
df.printSchema()
```
```shell
root
|-- id: long (nullable = true)
|-- json_str: string (nullable = true)
|-- v: struct (nullable = true)
| |-- value: binary (nullable = true)
| |-- metadata: binary (nullable = true)
```
```python
df.show(truncate=False, vertical=True)
```
```shell
-RECORD
0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 2
json_str | {"user": "Bob", "meta": {"location": "SF", "age": 25, "job":
"Eng"}}
v | {[02 02 01 00 04 00 16 0D 42 6F 62 02 03 03 04 02 03 05 00 09 09
53 46 0C 19 0D 45 6E 67], [01 05 00 04 08 10 13 16 75 73 65 72 6D 65 74 61 6C
6F 63 61 74 69 6F 6E 61 67 65 6A 6F 62]}
-RECORD
1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 1
json_str | {"user": "Alice", "meta": {"location": "NYC", "age": 30}}
v | {[02 02 01 00 06 00 13 15 41 6C 69 63 65 02 02 03 02 04 00 06 0D
4E 59 43 0C 1E], [01 04 00 04 08 10 13 75 73 65 72 6D 65 74 61 6C 6F 63 61 74
69 6F 6E 61 67 65]}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]