[
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910462#comment-17910462
]
Vlad Rozov commented on SPARK-48492:
------------------------------------
Taking a look
> batch-read parquet files written by streaming returns non-nullable fields in
> schema
> -----------------------------------------------------------------------------------
>
> Key: SPARK-48492
> URL: https://issues.apache.org/jira/browse/SPARK-48492
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.4.1
> Environment: python --version
> Python 3.9.13
>
> spark-submit --version
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 3.4.1
> /_/
>
> Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302
> Branch HEAD
> Compiled by user centos on 2023-06-19T23:01:01Z
> Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
> Reporter: Julien Peloton
> Priority: Major
>
> Hello,
> In the documentation, it is stated that
> > When reading Parquet files, all columns are automatically converted to be
> > nullable for compatibility reasons.
> While this seems correct for static DataFrames, I have a counter example for
> streaming ones:
>
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import Row
> import pyspark.sql.functions as F
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("WARN")
> df = spark.createDataFrame(
> [
> Row(a=1, b=2.0, c="toto"),
> Row(a=3, b=4.0, c="titi"),
> Row(a=10, b=11.0, c="tutu"),
> ]
> )
> # add a non-nullable column
> df = df.withColumn('d', F.lit(1.0))
> print("Original dataframe")
> df.printSchema()
> # Write this on disk
> df.write.parquet('static.parquet')
> # Now load a stream
> df_stream = (
> spark.readStream.format("parquet")
> .schema(df.schema)
> .option("path", "static.parquet")
> .option("latestFirst", False)
> .load()
> )
> # add a non-nullable column
> df_stream = df_stream.withColumn('e', F.lit("error"))
> print("Streaming dataframe")
> df_stream.printSchema()
> # Now write the dataframe using writestream
> query = (
> df_stream.writeStream.outputMode("append")
> .format("parquet")
> .option("checkpointLocation", 'test_parquet_checkpoint')
> .option("path", 'test_parquet')
> .trigger(availableNow=True)
> .start()
> )
> spark.streams.awaitAnyTermination()
> # Now read back
> df_stream_2 = spark.read.format("parquet").load("test_parquet")
> print("Static dataframe from the streaming job (read)")
> df_stream_2.printSchema()
> # Now load a stream
> df_stream_3 = (
> spark.readStream.format("parquet")
> .schema(df_stream_2.schema)
> .option("path", "test_parquet")
> .option("latestFirst", False)
> .load()
> )
> print("Streaming dataframe from the streaming job (readStream)")
> df_stream_3.printSchema(){code}
>
>
> which outputs:
> {noformat}
> Original dataframe
> root
> |-- a: long (nullable = true)
> |-- b: double (nullable = true)
> |-- c: string (nullable = true)
> |-- d: double (nullable = false)
> Streaming dataframe
> root
> |-- a: long (nullable = true)
> |-- b: double (nullable = true)
> |-- c: string (nullable = true)
> |-- d: double (nullable = true)
> |-- e: string (nullable = false)
> Static dataframe from the streaming job (read)
> root
> |-- a: long (nullable = true)
> |-- b: double (nullable = true)
> |-- c: string (nullable = true)
> |-- d: double (nullable = true)
> |-- e: string (nullable = false)
> Streaming dataframe from the streaming job (readStream)
> root
> |-- a: long (nullable = true)
> |-- b: double (nullable = true)
> |-- c: string (nullable = true)
> |-- d: double (nullable = true)
> |-- e: string (nullable = true){noformat}
>
> So the column `d` is correctly set to `nullable = true` (expected), but in
> the case of the column `e`, it stays non-nullable if it is read using the
> `read` method and it is correctly set to `nullable = true` if read with
> `readStream`. Is that expected? According to this old issue,
> https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be
> resolved. Any ideas?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]