[
https://issues.apache.org/jira/browse/SPARK-24264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492046#comment-16492046
]
Gerard Maas commented on SPARK-24264:
-------------------------------------
This scenario demonstrates the claims in this ticket:
Given PickUp and CashierOrder:
{{case class PickUp(orderId: String, waitDuration: Long)}}
{{case class Cashier(orderId: String, totalPrice: Float)}}
I create 2 parquet files with some random data following that schema
When I read the two files at the same time, using the batch API, we get:
{{val batchMerged =
session.read.option("mergeSchema","true").parquet("/tmp/data/batch/")}}
{{batchMerged.schema}}
{{>StructType(StructField(orderId,StringType,true),
StructField(waitDuration,LongType,true),
StructField(totalPrice,FloatType,true))}}
Data in the resulting dataframe is null depending on the source file:
{code:java}
>batchMerged.show
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1| 498| null|
| order_2| 819| null|
| order_3| 576| null|
| order_4| 741| null|
{code}
The schema is merged as expected, the resulting dataset will contain nulls when
the source does not contain the field.
e.g.:
{code:java}
batchMerged.where(!$"totalPrice".isNull).show
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1| null| 66.91808|
| order_2| null| 21.761215|
| order_3| null| 1.8776|
| order_4| null| 45.613895|
| order_5| null| 62.664383|
| order_6| null| 78.24584|
{code}
In streaming mode, we must provide the schema at creation time. There's no
schema inference:
{{val streamingMerged =
session.readStream.option("mergeSchema","true").schema(schema).parquet("/tmp/data/stream")}}
When I materialize this stream to a table, I get:
{code:java}
val query =
streamingMerged.writeStream.format("memory").queryName("parquet_merged").start()
val queryResultMerged = session.sql("select * from parquet_merged")
queryResultMerged.show
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1| 498| null|
| order_2| 819| null|
| order_3| 576| null|
| order_4| 741| null|
| order_5| 844| null|
{code}
I put that flag to 'off':
{code:java}
val streamingNotMerged =
session.readStream.option("mergeSchema","false").schema(schema).parquet("/tmp/data/stream")
... same materialization process...
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1| 498| null|
| order_2| 819| null|
| order_3| 576| null|
| order_4| 741| null|
{code}
When compared using subtraction:
{code:java}
val leftDiff = queryResultsNotMerged.except(queryResultMerged)
leftDiff.count
res58: Long = 0
val rightDiff = queryResultMerged.except(queryResultsNotMerged)
rightDiff.count
res61: Long = 0
{code}
There is no visible difference, except that Spark needs to do the schemaMerge
when this flag is set to true, which I understood has an additional expense.
> [Structured Streaming] Remove 'mergeSchema' option from Parquet source
> configuration
> ------------------------------------------------------------------------------------
>
> Key: SPARK-24264
> URL: https://issues.apache.org/jira/browse/SPARK-24264
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.0
> Reporter: Gerard Maas
> Priority: Major
> Labels: features, usability
>
> Looking into the Parquet format support for the File source in Structured
> Streaming, the docs mention the use of the option 'mergeSchema' to merge the
> schemas of the part files found.[1]
>
> There seems to be no practical use of that configuration in a streaming
> context.
>
> In its batch counterpart, `mergeSchemas` would infer the schema superset of
> the part-files found.
>
> When using the File source + parquet format in streaming mode, we must
> provide a schema to the readStream.schema(...) builder and that schema is
> fixed for the duration of the stream.
>
> My current understanding is that:
>
> - Files containing a subset of the fields declared in the schema will render
> null values for the non-existing fields.
> - For files containing a superset of the fields, the additional data fields
> will be lost.
> - Files not matching the schema set on the streaming source will render all
> fields null for each record in the file.
>
> It looks like 'mergeSchema' has no practical effect, although enabling it
> might lead to additional processing to actually merge the Parquet schema of
> the input files.
>
> I inquired on the dev+user mailing lists about any other behavior but I got
> no responses.
>
> From the user perspective, they may think that this option would help their
> job cope with schema evolution at runtime, but that is also not the case.
>
> Looks like removing this option and leaving the value always set to false is
> the reasonable thing to do.
>
> [1]
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]