[ 
https://issues.apache.org/jira/browse/SPARK-24264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492046#comment-16492046
 ] 

Gerard Maas commented on SPARK-24264:
-------------------------------------

This scenario demonstrates the claims in this ticket:
 
Given PickUp and CashierOrder:
{{case class PickUp(orderId: String, waitDuration: Long)}}

{{case class Cashier(orderId: String, totalPrice: Float)}}
 
I create 2 parquet files with some random data following that schema
 
When I read the two files at the same time, using the batch API, we get:
 
{{val batchMerged = 
session.read.option("mergeSchema","true").parquet("/tmp/data/batch/")}}

{{batchMerged.schema}}

{{>StructType(StructField(orderId,StringType,true), 
StructField(waitDuration,LongType,true), 
StructField(totalPrice,FloatType,true))}}
 
Data in the resulting dataframe is null depending on the source file:
 
{code:java}
>batchMerged.show
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1|         498|      null|
| order_2|         819|      null|
| order_3|         576|      null|
| order_4|         741|      null|
{code}
 
The schema is merged as expected, the resulting dataset will contain nulls when 
the source does not contain the field.
e.g.:
{code:java}
batchMerged.where(!$"totalPrice".isNull).show
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1|        null|  66.91808|
| order_2|        null| 21.761215|
| order_3|        null|    1.8776|
| order_4|        null| 45.613895|
| order_5|        null| 62.664383|
| order_6|        null|  78.24584|

{code}
 
In streaming mode, we must provide the schema at creation time. There's no 
schema inference:
 
{{val streamingMerged = 
session.readStream.option("mergeSchema","true").schema(schema).parquet("/tmp/data/stream")}}
When I materialize this stream to a table, I get:
 
{code:java}
val query = 
streamingMerged.writeStream.format("memory").queryName("parquet_merged").start()
val queryResultMerged = session.sql("select * from parquet_merged")
queryResultMerged.show
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1|         498|      null|
| order_2|         819|      null|
| order_3|         576|      null|
| order_4|         741|      null|
| order_5|         844|      null|

{code}
 
I put that flag to 'off':
 
{code:java}
val streamingNotMerged = 
session.readStream.option("mergeSchema","false").schema(schema).parquet("/tmp/data/stream")
... same materialization process...
+--------+------------+----------+
| orderId|waitDuration|totalPrice|
+--------+------------+----------+
| order_1|         498|      null|
| order_2|         819|      null|
| order_3|         576|      null|
| order_4|         741|      null|
{code}
When compared using subtraction:
{code:java}
val leftDiff = queryResultsNotMerged.except(queryResultMerged)
 
leftDiff.count
res58: Long = 0

val rightDiff = queryResultMerged.except(queryResultsNotMerged)
rightDiff.count
res61: Long = 0

{code}
 
There is no visible difference, except that Spark needs to do the schemaMerge 
when this flag is set to true, which I understood has an additional expense.

> [Structured Streaming] Remove 'mergeSchema' option from Parquet source 
> configuration
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-24264
>                 URL: https://issues.apache.org/jira/browse/SPARK-24264
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Gerard Maas
>            Priority: Major
>              Labels: features, usability
>
> Looking into the Parquet format support for the File source in Structured 
> Streaming, the docs mention the use of the option 'mergeSchema' to merge the 
> schemas of the part files found.[1]
>  
> There seems to be no practical use of that configuration in a streaming 
> context.
>  
> In its batch counterpart, `mergeSchemas` would infer the schema superset of 
> the part-files found. 
>  
>  When using the File source + parquet format in streaming mode, we must 
> provide a schema to the readStream.schema(...) builder and that schema is 
> fixed for the duration of the stream.
>  
> My current understanding is that:
>  
> - Files containing a subset of the fields declared in the schema will render 
> null values for the non-existing fields.
> - For files containing a superset of the fields, the additional data fields 
> will be lost. 
> - Files not matching the schema set on the streaming source will render all 
> fields null for each record in the file.
>  
> It looks like 'mergeSchema' has no practical effect, although enabling it 
> might lead to additional processing to actually merge the Parquet schema of 
> the input files. 
>  
> I inquired on the dev+user mailing lists about any other behavior but I got 
> no responses.
>  
> From the user perspective, they may think that this option would help their 
> job cope with schema evolution at runtime, but that is also not the case. 
>  
> Looks like removing this option and leaving the value always set to false is 
> the reasonable thing to do.  
>  
> [1] 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to