[
https://issues.apache.org/jira/browse/SPARK-18510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Burak Yavuz updated SPARK-18510:
--------------------------------
Description:
Not sure if this is a regression from 2.0 to 2.1. I was investigating this for
Structured Streaming, but it seems it affects batch data as well.
Here's the issue:
If I specify my schema when doing
{code}
spark.read
.schema(someSchemaWherePartitionColumnsAreStrings)
{code}
but if the partition inference can infer it as IntegerType or I assume LongType
or DoubleType (basically fixed size types), then once UnsafeRows are generated,
your data will be corrupted.
Reproduction:
{code}
val createArray = udf { (length: Long) =>
for (i <- 1 to length.toInt) yield i.toString
}
spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as
'part).coalesce(1).write
.partitionBy("part", "id")
.mode("overwrite")
.parquet(src.toString)
val schema = new StructType()
.add("id", StringType)
.add("part", IntegerType)
.add("ex", ArrayType(StringType))
spark.read
.schema(schema)
.format("parquet")
.load(src.toString)
.show()
{code}
The UDF is useful for creating a row long enough so that you don't hit other
weird NullPointerExceptions caused for the same reason I believe.
Output:
{code}
+---------+----+--------------------+
| id|part| ex|
+---------+----+--------------------+
|�| 1|[1, 2, 3, 4, 5, 6...|
| | 0|[1, 2, 3, 4, 5, 6...|
| | 3|[1, 2, 3, 4, 5, 6...|
| | 2|[1, 2, 3, 4, 5, 6...|
| | 1| [1, 2, 3, 4, 5, 6]|
| | 0| [1, 2, 3, 4, 5]|
| | 3| [1, 2, 3, 4]|
| | 2| [1, 2, 3]|
| | 1| [1, 2]|
| | 0| [1]|
+---------+----+--------------------+
{code}
I was hoping to fix the issue as part of SPARK-18407 but it seems it's not only
applicable to StructuredStreaming and deserves it's own JIRA.
was:
Not sure if this is a regression from 2.0 to 2.1. I was investigating this for
Structured Streaming, but it seems it affects batch data as well.
Here's the issue:
If I specify my schema when doing
{code}
spark.read
.schema(someSchemaWherePartitionColumnsAreStrings)
{code}
but if the partition inference can infer it as IntegerType or I assume LongType
or DoubleType (basically fixed size types), then once UnsafeRows are generated,
your data will be corrupted.
Reproduction:
{code}
val createArray = udf { (length: Long) =>
for (i <- 1 to length.toInt) yield i.toString
}
spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as
'part).coalesce(1).write
.partitionBy("part", "id")
.mode("overwrite")
.parquet(src.toString)
val schema = new StructType()
.add("id", StringType)
.add("part", IntegerType)
.add("ex", ArrayType(StringType))
spark.read
.schema(schema)
.format("parquet")
.load(src.toString)
.show()
{code}
Output:
{code}
+---------+----+--------------------+
| id|part| ex|
+---------+----+--------------------+
|�| 1|[1, 2, 3, 4, 5, 6...|
| | 0|[1, 2, 3, 4, 5, 6...|
| | 3|[1, 2, 3, 4, 5, 6...|
| | 2|[1, 2, 3, 4, 5, 6...|
| | 1| [1, 2, 3, 4, 5, 6]|
| | 0| [1, 2, 3, 4, 5]|
| | 3| [1, 2, 3, 4]|
| | 2| [1, 2, 3]|
| | 1| [1, 2]|
| | 0| [1]|
+---------+----+--------------------+
{code}
I was hoping to fix the issue as part of SPARK-18407 but it seems it's not only
applicable to StructuredStreaming and deserves it's own JIRA.
> Partition schema inference corrupts data
> ----------------------------------------
>
> Key: SPARK-18510
> URL: https://issues.apache.org/jira/browse/SPARK-18510
> Project: Spark
> Issue Type: Bug
> Components: SQL, Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Burak Yavuz
> Priority: Blocker
>
> Not sure if this is a regression from 2.0 to 2.1. I was investigating this
> for Structured Streaming, but it seems it affects batch data as well.
> Here's the issue:
> If I specify my schema when doing
> {code}
> spark.read
> .schema(someSchemaWherePartitionColumnsAreStrings)
> {code}
> but if the partition inference can infer it as IntegerType or I assume
> LongType or DoubleType (basically fixed size types), then once UnsafeRows are
> generated, your data will be corrupted.
> Reproduction:
> {code}
> val createArray = udf { (length: Long) =>
> for (i <- 1 to length.toInt) yield i.toString
> }
> spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as
> 'part).coalesce(1).write
> .partitionBy("part", "id")
> .mode("overwrite")
> .parquet(src.toString)
> val schema = new StructType()
> .add("id", StringType)
> .add("part", IntegerType)
> .add("ex", ArrayType(StringType))
> spark.read
> .schema(schema)
> .format("parquet")
> .load(src.toString)
> .show()
> {code}
> The UDF is useful for creating a row long enough so that you don't hit other
> weird NullPointerExceptions caused for the same reason I believe.
> Output:
> {code}
> +---------+----+--------------------+
> | id|part| ex|
> +---------+----+--------------------+
> |�| 1|[1, 2, 3, 4, 5, 6...|
> | | 0|[1, 2, 3, 4, 5, 6...|
> | | 3|[1, 2, 3, 4, 5, 6...|
> | | 2|[1, 2, 3, 4, 5, 6...|
> | | 1| [1, 2, 3, 4, 5, 6]|
> | | 0| [1, 2, 3, 4, 5]|
> | | 3| [1, 2, 3, 4]|
> | | 2| [1, 2, 3]|
> | | 1| [1, 2]|
> | | 0| [1]|
> +---------+----+--------------------+
> {code}
> I was hoping to fix the issue as part of SPARK-18407 but it seems it's not
> only applicable to StructuredStreaming and deserves it's own JIRA.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]