[
https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638843#comment-14638843
]
Steve Lindemann commented on SPARK-9278:
----------------------------------------
Here are the steps to reproduce the issue. First, create a Hive table with the
desired schema:
{noformat}
In [1]: hc = pyspark.sql.HiveContext(sqlContext)
In [2]: pdf = pd.DataFrame({'pk': ['a']*5+['b']*5+['c']*5, 'k': ['a', 'e', 'i',
'o', 'u']*3, 'v': range(15)})
In [3]: sdf = hc.createDataFrame(pdf)
In [4]: sdf.show()
+-+--+--+
|k|pk| v|
+-+--+--+
|a| a| 0|
|e| a| 1|
|i| a| 2|
|o| a| 3|
|u| a| 4|
|a| b| 5|
|e| b| 6|
|i| b| 7|
|o| b| 8|
|u| b| 9|
|a| c|10|
|e| c|11|
|i| c|12|
|o| c|13|
|u| c|14|
+-+--+--+
In [5]: sdf.filter('FALSE').write.partitionBy('pk').saveAsTable('foo',
format='parquet', path='s3a://eglp-core-temp/tmp/foo')
{noformat}
A table has been created:
{noformat}
In [33]: print('\n'.join(r.result for r in hc.sql('SHOW CREATE TABLE
foo').collect()))
CREATE EXTERNAL TABLE `foo`(
`col` array<string> COMMENT 'from deserializer')
PARTITIONED BY (
`pk` string COMMENT '')
ROW FORMAT DELIMITED
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.SequenceFileInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
LOCATION
's3a://eglp-core-data/hive/warehouse/foo'
TBLPROPERTIES (
'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"k\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pk\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}',
'transient_lastDdlTime'='1437657391',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.provider'='parquet')
{noformat}
Now, write a new partition of data (note that this is from the same DataFrame
from which the table was created):
{noformat}
sdf.filter(sdf.pk == 'a').write.partitionBy('pk').insertInto('foo')
{noformat}
Then, select the data:
{noformat}
In [7]: foo = hc.table('foo')
In [8]: foo.show()
+-+----+--+
|k| v|pk|
+-+----+--+
|a|null| 0|
|o|null| 3|
|i|null| 2|
|e|null| 1|
|u|null| 4|
+-+----+--+
In [9]: sdf.filter(sdf.pk == 'a').show()
+-+--+-+
|k|pk|v|
+-+--+-+
|a| a|0|
|e| a|1|
|i| a|2|
|o| a|3|
|u| a|4|
+-+--+-+
{noformat}
So clearly it inserted incorrect data. By reordering the columns, we can insert
data properly:
{noformat}
In [10]: pdf2 = pdf[['k', 'v', 'pk']]
In [11]: sdf2 = hc.createDataFrame(pdf2)
In [12]: sdf2.filter(sdf2.pk == 'a').write.partitionBy('pk').insertInto('foo')
In [13]: hc.refreshTable('foo')
In [14]: foo = hc.table('foo')
In [15]: foo.show()
+-+----+--+
|k| v|pk|
+-+----+--+
|a|null| 0|
|o|null| 3|
|i|null| 2|
|e|null| 1|
|u|null| 4|
|o| 3| a|
|u| 4| a|
|a| 0| a|
|e| 1| a|
|i| 2| a|
+-+----+--+
{noformat}
> DataFrameWriter.insertInto inserts incorrect data
> -------------------------------------------------
>
> Key: SPARK-9278
> URL: https://issues.apache.org/jira/browse/SPARK-9278
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.0
> Environment: Linux, S3, Hive Metastore
> Reporter: Steve Lindemann
>
> After creating a partitioned Hive table (stored as Parquet) via the
> DataFrameWriter.createTable command, subsequent attempts to insert additional
> data into new partitions of this table result in inserting incorrect data
> rows. Reordering the columns in the data to be written seems to avoid this
> issue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]