[ 
https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638843#comment-14638843
 ] 

Steve Lindemann commented on SPARK-9278:
----------------------------------------

Here are the steps to reproduce the issue. First, create a Hive table with the 
desired schema:

{noformat}
In [1]: hc = pyspark.sql.HiveContext(sqlContext)
In [2]: pdf = pd.DataFrame({'pk': ['a']*5+['b']*5+['c']*5, 'k': ['a', 'e', 'i', 
'o', 'u']*3, 'v': range(15)})
In [3]: sdf = hc.createDataFrame(pdf)
In [4]: sdf.show()
+-+--+--+
|k|pk| v|
+-+--+--+
|a| a| 0|
|e| a| 1|
|i| a| 2|
|o| a| 3|
|u| a| 4|
|a| b| 5|
|e| b| 6|
|i| b| 7|
|o| b| 8|
|u| b| 9|
|a| c|10|
|e| c|11|
|i| c|12|
|o| c|13|
|u| c|14|
+-+--+--+
In [5]: sdf.filter('FALSE').write.partitionBy('pk').saveAsTable('foo', 
format='parquet', path='s3a://eglp-core-temp/tmp/foo')
{noformat}

A table has been created:

{noformat}
In [33]: print('\n'.join(r.result for r in hc.sql('SHOW CREATE TABLE 
foo').collect()))
CREATE EXTERNAL TABLE `foo`(
  `col` array<string> COMMENT 'from deserializer')
PARTITIONED BY (
  `pk` string COMMENT '')
ROW FORMAT DELIMITED
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.SequenceFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
LOCATION
  's3a://eglp-core-data/hive/warehouse/foo'
TBLPROPERTIES (
  
'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"k\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pk\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}',
  'transient_lastDdlTime'='1437657391',
  'spark.sql.sources.schema.numParts'='1',
  'spark.sql.sources.provider'='parquet')
{noformat}

Now, write a new partition of data (note that this is from the same DataFrame 
from which the table was created):

{noformat}
sdf.filter(sdf.pk == 'a').write.partitionBy('pk').insertInto('foo')
{noformat}

Then, select the data:

{noformat}
In [7]: foo = hc.table('foo')
In [8]: foo.show()
+-+----+--+
|k|   v|pk|
+-+----+--+
|a|null| 0|
|o|null| 3|
|i|null| 2|
|e|null| 1|
|u|null| 4|
+-+----+--+
In [9]: sdf.filter(sdf.pk == 'a').show()
+-+--+-+
|k|pk|v|
+-+--+-+
|a| a|0|
|e| a|1|
|i| a|2|
|o| a|3|
|u| a|4|
+-+--+-+
{noformat}

So clearly it inserted incorrect data. By reordering the columns, we can insert 
data properly:

{noformat}
In [10]: pdf2 = pdf[['k', 'v', 'pk']]
In [11]: sdf2 = hc.createDataFrame(pdf2)
In [12]: sdf2.filter(sdf2.pk == 'a').write.partitionBy('pk').insertInto('foo')
In [13]: hc.refreshTable('foo')
In [14]: foo = hc.table('foo')
In [15]: foo.show()
+-+----+--+
|k|   v|pk|
+-+----+--+
|a|null| 0|
|o|null| 3|
|i|null| 2|
|e|null| 1|
|u|null| 4|
|o|   3| a|
|u|   4| a|
|a|   0| a|
|e|   1| a|
|i|   2| a|
+-+----+--+
{noformat}

> DataFrameWriter.insertInto inserts incorrect data
> -------------------------------------------------
>
>                 Key: SPARK-9278
>                 URL: https://issues.apache.org/jira/browse/SPARK-9278
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0
>         Environment: Linux, S3, Hive Metastore
>            Reporter: Steve Lindemann
>
> After creating a partitioned Hive table (stored as Parquet) via the 
> DataFrameWriter.createTable command, subsequent attempts to insert additional 
> data into new partitions of this table result in inserting incorrect data 
> rows. Reordering the columns in the data to be written seems to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to