[ https://issues.apache.org/jira/browse/SPARK-21698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luis updated SPARK-21698: ------------------------- Description: Spark partionBy is causing some data corruption. I am doing three super simple writes. . Below is the code to reproduce the problem. h4. rogram output output : {code:title=Program Output|borderStyle=solid} 17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data [TestSparkUtils]: DEBUG: [------------ Initial Create ----------] +-----+---+----+ |count| id|name| +-----+---+----+ | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| +-----+---+----+ [TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------] 17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data 17/08/10 15:30:44 WARN log: Updated size to 545 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data 17/08/10 15:30:44 WARN log: Updated size to 545 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data 17/08/10 15:30:44 WARN log: Updated size to 545 +---+----+-----+ | id|name|count| +---+----+-----+ | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| | 4| 4| 4| | 5| 5| 5| | 6| 6| 6| +---+----+-----+ +-----+---+-----+ |count| id| name| +-----+---+-----+ | 7| 1| one| | 8| 2| two| | 9| 4|three| | 10| 6| six| +-----+---+-----+ [TestSparkUtils]: DEBUG: [------------ Update ----------] 17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data 17/08/10 15:30:45 WARN log: Updating partition stats fast for: data 17/08/10 15:30:45 WARN log: Updated size to 1122 +---+----+-----+ | id|name|count| +---+----+-----+ | 9| 4| null| | 10| 6| null| | 7| 1| null| | 8| 2| null| | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| | 4| 4| 4| | 5| 5| 5| | 6| 6| 6| +---+----+-----+ .. ---------------------------------------------------------------------- Ran 2 tests in 11.559s OK {code} In the last show(). I see the data is corrupted. The data was switched on the columns, and I am getting null results. Below is the main clips of the code I am using generate the problem: {code:title=spark init|borderStyle=solid} self.spark = SparkSession \ .builder \ .master("spark://localhost:7077") \ .enableHiveSupport() \ .getOrCreate() self.driver = SparkUtils(self.spark) {code} {code:title=Code for the test case|borderStyle=solid} def test_insert_table(self): self.log.debug("[test_insert_table]") table_name = "data" self.driver.drop_table(table_name) data0 = [ {"id": 1, "name":"1", "count": 1}, {"id": 2, "name":"2", "count": 2}, {"id": 3, "name":"3", "count": 3}, ] df_data0 = self.spark.createDataFrame(data0) df_return = self.driver.insert_table(df_data0, "data", ["count"]) self.log.debug("[------------ Initial Create ----------]") df_return.show() data1 = [ {"id": 4, "name":"4", "count": 4}, {"id": 5, "name":"5", "count": 5}, {"id": 6, "name":"6", "count": 6}, ] df_data1 = self.spark.createDataFrame(data1) self.log.debug("[------------ Insert No Duplicates ----------]") df_return = self.driver.insert_table(df_data1, "data", ["count"]) df_return.show() data3 = [ {"id": 1, "name":"one", "count":7}, {"id": 2, "name":"two", "count": 8}, {"id": 4, "name":"three", "count": 9}, {"id": 6, "name":"six", "count":10} ] df_data3 = self.spark.createDataFrame(data3) df_data3.show() self.log.debug("[------------ Update ----------]") df_return = self.driver.insert_table(df_data3, "data", ["count"]) df_return.show() {code} As you can see I am doing three simple table writes. The first time it uses saveasTable() with partionBy(). The second time does just insertInto() and on the third insertInto() the data is corrupted. {code:title=This is method I am testing|borderStyle=solid} def insert_table(self, df_table, table_name, primary_key=None): """ A simple insert, will just append rows to the the existing table """ if self.in_table(table_name): self.plogger.warn("[Inserting Into Table] " + table_name) df_table.write.mode("overwrite").insertInto(table_name) return self.read_table(table_name) else: self.plogger.warn("[Creating/Writing Table] " + table_name) df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name) return df_table {code} was: Spark partionBy is causing some data corruption. I am doing three super simple writes. . Below is the code to reproduce the problem. h4. I'll by showing the program output output : {code:title=Program Output|borderStyle=solid} 17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data [TestSparkUtils]: DEBUG: [------------ Initial Create ----------] +-----+---+----+ |count| id|name| +-----+---+----+ | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| +-----+---+----+ [TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------] 17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data 17/08/10 15:30:44 WARN log: Updated size to 545 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data 17/08/10 15:30:44 WARN log: Updated size to 545 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data 17/08/10 15:30:44 WARN log: Updated size to 545 +---+----+-----+ | id|name|count| +---+----+-----+ | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| | 4| 4| 4| | 5| 5| 5| | 6| 6| 6| +---+----+-----+ +-----+---+-----+ |count| id| name| +-----+---+-----+ | 7| 1| one| | 8| 2| two| | 9| 4|three| | 10| 6| six| +-----+---+-----+ [TestSparkUtils]: DEBUG: [------------ Update ----------] 17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data 17/08/10 15:30:45 WARN log: Updating partition stats fast for: data 17/08/10 15:30:45 WARN log: Updated size to 1122 +---+----+-----+ | id|name|count| +---+----+-----+ | 9| 4| null| | 10| 6| null| | 7| 1| null| | 8| 2| null| | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| | 4| 4| 4| | 5| 5| 5| | 6| 6| 6| +---+----+-----+ .. ---------------------------------------------------------------------- Ran 2 tests in 11.559s OK {code} In the last show(). I see the data is corrupted. The data was switched on the columns, and I am getting null results. Below is the main clips of the code I am using generate the problem: {code:title=spark init|borderStyle=solid} self.spark = SparkSession \ .builder \ .master("spark://localhost:7077") \ .enableHiveSupport() \ .getOrCreate() self.driver = SparkUtils(self.spark) {code} {code:title=Code for the test case|borderStyle=solid} def test_insert_table(self): self.log.debug("[test_insert_table]") table_name = "data" self.driver.drop_table(table_name) data0 = [ {"id": 1, "name":"1", "count": 1}, {"id": 2, "name":"2", "count": 2}, {"id": 3, "name":"3", "count": 3}, ] df_data0 = self.spark.createDataFrame(data0) df_return = self.driver.insert_table(df_data0, "data", ["count"]) self.log.debug("[------------ Initial Create ----------]") df_return.show() data1 = [ {"id": 4, "name":"4", "count": 4}, {"id": 5, "name":"5", "count": 5}, {"id": 6, "name":"6", "count": 6}, ] df_data1 = self.spark.createDataFrame(data1) self.log.debug("[------------ Insert No Duplicates ----------]") df_return = self.driver.insert_table(df_data1, "data", ["count"]) df_return.show() data3 = [ {"id": 1, "name":"one", "count":7}, {"id": 2, "name":"two", "count": 8}, {"id": 4, "name":"three", "count": 9}, {"id": 6, "name":"six", "count":10} ] df_data3 = self.spark.createDataFrame(data3) df_data3.show() self.log.debug("[------------ Update ----------]") df_return = self.driver.insert_table(df_data3, "data", ["count"]) df_return.show() {code} As you can see I am doing three simple table writes. The first time it uses saveasTable() with partionBy(). The second time does just insertInto() and on the third insertInto() the data is corrupted. {code:title=This is method I am testing|borderStyle=solid} def insert_table(self, df_table, table_name, primary_key=None): """ A simple insert, will just append rows to the the existing table """ if self.in_table(table_name): self.plogger.warn("[Inserting Into Table] " + table_name) df_table.write.mode("overwrite").insertInto(table_name) return self.read_table(table_name) else: self.plogger.warn("[Creating/Writing Table] " + table_name) df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name) return df_table {code} > write.partitionBy() is giving me garbage data > --------------------------------------------- > > Key: SPARK-21698 > URL: https://issues.apache.org/jira/browse/SPARK-21698 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.1, 2.2.0 > Environment: Linux Ubuntu 17.04. Python 3.5. > Reporter: Luis > > Spark partionBy is causing some data corruption. I am doing three super > simple writes. . Below is the code to reproduce the problem. > h4. rogram output output : > {code:title=Program Output|borderStyle=solid} > 17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data > > > [TestSparkUtils]: DEBUG: [------------ Initial Create ----------] > > > +-----+---+----+ > > > |count| id|name| > > > +-----+---+----+ > > > | 1| 1| 1| > > > | 2| 2| 2| > > > | 3| 3| 3| > > > +-----+---+----+ > > > > > > [TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------] > > > 17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data > > > 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data > > > 17/08/10 15:30:44 WARN log: Updated size to 545 > > > 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data > > > 17/08/10 15:30:44 WARN log: Updated size to 545 > > > 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data > 17/08/10 15:30:44 WARN log: Updated size to 545 > +---+----+-----+ > | id|name|count| > +---+----+-----+ > | 1| 1| 1| > | 2| 2| 2| > | 3| 3| 3| > | 4| 4| 4| > | 5| 5| 5| > | 6| 6| 6| > +---+----+-----+ > +-----+---+-----+ > |count| id| name| > +-----+---+-----+ > | 7| 1| one| > | 8| 2| two| > | 9| 4|three| > | 10| 6| six| > +-----+---+-----+ > [TestSparkUtils]: DEBUG: [------------ Update ----------] > 17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data > 17/08/10 15:30:45 WARN log: Updating partition stats fast for: data > 17/08/10 15:30:45 WARN log: Updated size to 1122 > +---+----+-----+ > | id|name|count| > +---+----+-----+ > | 9| 4| null| > | 10| 6| null| > | 7| 1| null| > | 8| 2| null| > | 1| 1| 1| > | 2| 2| 2| > | 3| 3| 3| > | 4| 4| 4| > | 5| 5| 5| > | 6| 6| 6| > +---+----+-----+ > .. > ---------------------------------------------------------------------- > Ran 2 tests in 11.559s > OK > {code} > In the last show(). I see the data is corrupted. The data was switched on the > columns, and I am getting null results. Below is the main clips of the code I > am using generate the problem: > {code:title=spark init|borderStyle=solid} > self.spark = SparkSession \ > .builder \ > .master("spark://localhost:7077") \ > .enableHiveSupport() \ > .getOrCreate() > self.driver = SparkUtils(self.spark) > {code} > {code:title=Code for the test case|borderStyle=solid} > def test_insert_table(self): > self.log.debug("[test_insert_table]") > table_name = "data" > self.driver.drop_table(table_name) > data0 = [ > {"id": 1, "name":"1", "count": 1}, > {"id": 2, "name":"2", "count": 2}, > {"id": 3, "name":"3", "count": 3}, > ] > df_data0 = self.spark.createDataFrame(data0) > df_return = self.driver.insert_table(df_data0, "data", ["count"]) > self.log.debug("[------------ Initial Create ----------]") > df_return.show() > data1 = [ > {"id": 4, "name":"4", "count": 4}, > {"id": 5, "name":"5", "count": 5}, > {"id": 6, "name":"6", "count": 6}, > ] > df_data1 = self.spark.createDataFrame(data1) > self.log.debug("[------------ Insert No Duplicates ----------]") > df_return = self.driver.insert_table(df_data1, "data", ["count"]) > df_return.show() > data3 = [ > {"id": 1, "name":"one", "count":7}, > {"id": 2, "name":"two", "count": 8}, > {"id": 4, "name":"three", "count": 9}, > {"id": 6, "name":"six", "count":10} > ] > df_data3 = self.spark.createDataFrame(data3) > df_data3.show() > self.log.debug("[------------ Update ----------]") > df_return = self.driver.insert_table(df_data3, "data", ["count"]) > df_return.show() > {code} > As you can see I am doing three simple table writes. The first time it uses > saveasTable() with partionBy(). The second time does just insertInto() and on > the third insertInto() the data is corrupted. > {code:title=This is method I am testing|borderStyle=solid} > def insert_table(self, df_table, table_name, primary_key=None): > """ > A simple insert, will just append rows to the the existing table > """ > if self.in_table(table_name): > self.plogger.warn("[Inserting Into Table] " + table_name) > df_table.write.mode("overwrite").insertInto(table_name) > return self.read_table(table_name) > else: > self.plogger.warn("[Creating/Writing Table] " + table_name) > > df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name) > return df_table > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org