[ https://issues.apache.org/jira/browse/SPARK-18510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Burak Yavuz updated SPARK-18510: -------------------------------- Description: Not sure if this is a regression from 2.0 to 2.1. I was investigating this for Structured Streaming, but it seems it affects batch data as well. Here's the issue: If I specify my schema when doing {code} spark.read .schema(someSchemaWherePartitionColumnsAreStrings) {code} but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType (basically fixed size types), then once UnsafeRows are generated, your data will be corrupted. Reproduction: {code} val createArray = udf { (length: Long) => for (i <- 1 to length.toInt) yield i.toString } spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 'part).coalesce(1).write .partitionBy("part", "id") .mode("overwrite") .parquet(src.toString) val schema = new StructType() .add("id", StringType) .add("part", IntegerType) .add("ex", ArrayType(StringType)) spark.read .schema(schema) .format("parquet") .load(src.toString) .show() {code} The UDF is useful for creating a row long enough so that you don't hit other weird NullPointerExceptions caused for the same reason I believe. Output: {code} +---------+----+--------------------+ | id|part| ex| +---------+----+--------------------+ |�| 1|[1, 2, 3, 4, 5, 6...| | | 0|[1, 2, 3, 4, 5, 6...| | | 3|[1, 2, 3, 4, 5, 6...| | | 2|[1, 2, 3, 4, 5, 6...| | | 1| [1, 2, 3, 4, 5, 6]| | | 0| [1, 2, 3, 4, 5]| | | 3| [1, 2, 3, 4]| | | 2| [1, 2, 3]| | | 1| [1, 2]| | | 0| [1]| +---------+----+--------------------+ {code} I was hoping to fix the issue as part of SPARK-18407 but it seems it's not only applicable to StructuredStreaming and deserves it's own JIRA. was: Not sure if this is a regression from 2.0 to 2.1. I was investigating this for Structured Streaming, but it seems it affects batch data as well. Here's the issue: If I specify my schema when doing {code} spark.read .schema(someSchemaWherePartitionColumnsAreStrings) {code} but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType (basically fixed size types), then once UnsafeRows are generated, your data will be corrupted. Reproduction: {code} val createArray = udf { (length: Long) => for (i <- 1 to length.toInt) yield i.toString } spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 'part).coalesce(1).write .partitionBy("part", "id") .mode("overwrite") .parquet(src.toString) val schema = new StructType() .add("id", StringType) .add("part", IntegerType) .add("ex", ArrayType(StringType)) spark.read .schema(schema) .format("parquet") .load(src.toString) .show() {code} Output: {code} +---------+----+--------------------+ | id|part| ex| +---------+----+--------------------+ |�| 1|[1, 2, 3, 4, 5, 6...| | | 0|[1, 2, 3, 4, 5, 6...| | | 3|[1, 2, 3, 4, 5, 6...| | | 2|[1, 2, 3, 4, 5, 6...| | | 1| [1, 2, 3, 4, 5, 6]| | | 0| [1, 2, 3, 4, 5]| | | 3| [1, 2, 3, 4]| | | 2| [1, 2, 3]| | | 1| [1, 2]| | | 0| [1]| +---------+----+--------------------+ {code} I was hoping to fix the issue as part of SPARK-18407 but it seems it's not only applicable to StructuredStreaming and deserves it's own JIRA. > Partition schema inference corrupts data > ---------------------------------------- > > Key: SPARK-18510 > URL: https://issues.apache.org/jira/browse/SPARK-18510 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.1.0 > Reporter: Burak Yavuz > Priority: Blocker > > Not sure if this is a regression from 2.0 to 2.1. I was investigating this > for Structured Streaming, but it seems it affects batch data as well. > Here's the issue: > If I specify my schema when doing > {code} > spark.read > .schema(someSchemaWherePartitionColumnsAreStrings) > {code} > but if the partition inference can infer it as IntegerType or I assume > LongType or DoubleType (basically fixed size types), then once UnsafeRows are > generated, your data will be corrupted. > Reproduction: > {code} > val createArray = udf { (length: Long) => > for (i <- 1 to length.toInt) yield i.toString > } > spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as > 'part).coalesce(1).write > .partitionBy("part", "id") > .mode("overwrite") > .parquet(src.toString) > val schema = new StructType() > .add("id", StringType) > .add("part", IntegerType) > .add("ex", ArrayType(StringType)) > spark.read > .schema(schema) > .format("parquet") > .load(src.toString) > .show() > {code} > The UDF is useful for creating a row long enough so that you don't hit other > weird NullPointerExceptions caused for the same reason I believe. > Output: > {code} > +---------+----+--------------------+ > | id|part| ex| > +---------+----+--------------------+ > |�| 1|[1, 2, 3, 4, 5, 6...| > | | 0|[1, 2, 3, 4, 5, 6...| > | | 3|[1, 2, 3, 4, 5, 6...| > | | 2|[1, 2, 3, 4, 5, 6...| > | | 1| [1, 2, 3, 4, 5, 6]| > | | 0| [1, 2, 3, 4, 5]| > | | 3| [1, 2, 3, 4]| > | | 2| [1, 2, 3]| > | | 1| [1, 2]| > | | 0| [1]| > +---------+----+--------------------+ > {code} > I was hoping to fix the issue as part of SPARK-18407 but it seems it's not > only applicable to StructuredStreaming and deserves it's own JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org