[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638843#comment-14638843 ]
Steve Lindemann commented on SPARK-9278: ---------------------------------------- Here are the steps to reproduce the issue. First, create a Hive table with the desired schema: {noformat} In [1]: hc = pyspark.sql.HiveContext(sqlContext) In [2]: pdf = pd.DataFrame({'pk': ['a']*5+['b']*5+['c']*5, 'k': ['a', 'e', 'i', 'o', 'u']*3, 'v': range(15)}) In [3]: sdf = hc.createDataFrame(pdf) In [4]: sdf.show() +-+--+--+ |k|pk| v| +-+--+--+ |a| a| 0| |e| a| 1| |i| a| 2| |o| a| 3| |u| a| 4| |a| b| 5| |e| b| 6| |i| b| 7| |o| b| 8| |u| b| 9| |a| c|10| |e| c|11| |i| c|12| |o| c|13| |u| c|14| +-+--+--+ In [5]: sdf.filter('FALSE').write.partitionBy('pk').saveAsTable('foo', format='parquet', path='s3a://eglp-core-temp/tmp/foo') {noformat} A table has been created: {noformat} In [33]: print('\n'.join(r.result for r in hc.sql('SHOW CREATE TABLE foo').collect())) CREATE EXTERNAL TABLE `foo`( `col` array<string> COMMENT 'from deserializer') PARTITIONED BY ( `pk` string COMMENT '') ROW FORMAT DELIMITED STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat' LOCATION 's3a://eglp-core-data/hive/warehouse/foo' TBLPROPERTIES ( 'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"k\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pk\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}', 'transient_lastDdlTime'='1437657391', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.provider'='parquet') {noformat} Now, write a new partition of data (note that this is from the same DataFrame from which the table was created): {noformat} sdf.filter(sdf.pk == 'a').write.partitionBy('pk').insertInto('foo') {noformat} Then, select the data: {noformat} In [7]: foo = hc.table('foo') In [8]: foo.show() +-+----+--+ |k| v|pk| +-+----+--+ |a|null| 0| |o|null| 3| |i|null| 2| |e|null| 1| |u|null| 4| +-+----+--+ In [9]: sdf.filter(sdf.pk == 'a').show() +-+--+-+ |k|pk|v| +-+--+-+ |a| a|0| |e| a|1| |i| a|2| |o| a|3| |u| a|4| +-+--+-+ {noformat} So clearly it inserted incorrect data. By reordering the columns, we can insert data properly: {noformat} In [10]: pdf2 = pdf[['k', 'v', 'pk']] In [11]: sdf2 = hc.createDataFrame(pdf2) In [12]: sdf2.filter(sdf2.pk == 'a').write.partitionBy('pk').insertInto('foo') In [13]: hc.refreshTable('foo') In [14]: foo = hc.table('foo') In [15]: foo.show() +-+----+--+ |k| v|pk| +-+----+--+ |a|null| 0| |o|null| 3| |i|null| 2| |e|null| 1| |u|null| 4| |o| 3| a| |u| 4| a| |a| 0| a| |e| 1| a| |i| 2| a| +-+----+--+ {noformat} > DataFrameWriter.insertInto inserts incorrect data > ------------------------------------------------- > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore > Reporter: Steve Lindemann > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org