[ 
https://issues.apache.org/jira/browse/SPARK-21698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis updated SPARK-21698:
-------------------------
    Description: 
Spark partionBy is causing some data corruption.  I am doing three super simple 
writes. . Below is the code to reproduce the problem.


h4. rogram output output :

{code:title=Program Output|borderStyle=solid}
17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data              
                                                                                
                                             
[TestSparkUtils]: DEBUG: [------------ Initial Create ----------]               
                                                                                
                                             
+-----+---+----+                                                                
                                                                                
                                             
|count| id|name|                                                                
                                                                                
                                             
+-----+---+----+                                                                
                                                                                
                                             
|    1|  1|   1|                                                                
                                                                                
                                             
|    2|  2|   2|                                                                
                                                                                
                                             
|    3|  3|   3|                                                                
                                                                                
                                             
+-----+---+----+                                                                
                                                                                
                                             
                                                                                
                                                                                
                                             
[TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------]         
                                                                                
                                             
17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data                
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data             
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updated size to 545                                 
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data             
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updated size to 545                                 
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data
17/08/10 15:30:44 WARN log: Updated size to 545
+---+----+-----+
| id|name|count|
+---+----+-----+
|  1|   1|    1|
|  2|   2|    2|
|  3|   3|    3|
|  4|   4|    4|
|  5|   5|    5|
|  6|   6|    6|
+---+----+-----+

+-----+---+-----+
|count| id| name|
+-----+---+-----+
|    7|  1|  one|
|    8|  2|  two|
|    9|  4|three|
|   10|  6|  six|
+-----+---+-----+

[TestSparkUtils]: DEBUG: [------------ Update ----------]
17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data
17/08/10 15:30:45 WARN log: Updating partition stats fast for: data
17/08/10 15:30:45 WARN log: Updated size to 1122
+---+----+-----+
| id|name|count|
+---+----+-----+
|  9|   4| null|
| 10|   6| null|
|  7|   1| null|
|  8|   2| null|
|  1|   1|    1|
|  2|   2|    2|
|  3|   3|    3|
|  4|   4|    4|
|  5|   5|    5|
|  6|   6|    6|
+---+----+-----+

..
----------------------------------------------------------------------
Ran 2 tests in 11.559s

OK
{code}

In the last show(). I see the data is corrupted. The data was switched on the 
columns, and I am getting null results. Below is the main clips of the code I 
am using generate the problem:

{code:title=spark init|borderStyle=solid}
        self.spark = SparkSession \
                .builder \
                .master("spark://localhost:7077") \
                .enableHiveSupport() \
                .getOrCreate()

        self.driver = SparkUtils(self.spark)
{code}

{code:title=Code for the test case|borderStyle=solid}
    def test_insert_table(self):
        self.log.debug("[test_insert_table]")
        table_name = "data"
        self.driver.drop_table(table_name)
        data0 = [
            {"id": 1, "name":"1", "count": 1},
            {"id": 2, "name":"2", "count": 2},
            {"id": 3, "name":"3", "count": 3},
        ]
        df_data0 = self.spark.createDataFrame(data0)
        df_return = self.driver.insert_table(df_data0, "data", ["count"])
        self.log.debug("[------------ Initial Create ----------]")
        df_return.show()
        data1 = [
            {"id": 4, "name":"4", "count": 4},
            {"id": 5, "name":"5", "count": 5},
            {"id": 6, "name":"6", "count": 6},
        ]
        df_data1 = self.spark.createDataFrame(data1)
        self.log.debug("[------------ Insert No Duplicates ----------]")
        df_return = self.driver.insert_table(df_data1, "data", ["count"])
        df_return.show()

        data3 = [
            {"id": 1, "name":"one", "count":7},
            {"id": 2, "name":"two", "count": 8},
            {"id": 4, "name":"three", "count": 9},
            {"id": 6, "name":"six", "count":10}
        ]
        df_data3 = self.spark.createDataFrame(data3)
        df_data3.show()
        self.log.debug("[------------ Update ----------]")
        df_return = self.driver.insert_table(df_data3, "data", ["count"])
        df_return.show()
{code}

As you can see I am doing three simple table writes. The first time it uses 
saveasTable() with partionBy(). The second time does just insertInto() and on 
the third insertInto() the data is corrupted.


{code:title=This is method I am testing|borderStyle=solid}
    def insert_table(self, df_table, table_name, primary_key=None):
        """
            A simple insert, will just append rows to the the existing table
        """
        if self.in_table(table_name):
            self.plogger.warn("[Inserting Into Table] " + table_name)
            df_table.write.mode("overwrite").insertInto(table_name)
            return self.read_table(table_name)
        else:
            self.plogger.warn("[Creating/Writing Table] " + table_name)
            
df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name)
            return df_table
{code}







  was:
Spark partionBy is causing some data corruption.  I am doing three super simple 
writes. . Below is the code to reproduce the problem.


h4. I'll by showing the program output output :

{code:title=Program Output|borderStyle=solid}
17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data              
                                                                                
                                             
[TestSparkUtils]: DEBUG: [------------ Initial Create ----------]               
                                                                                
                                             
+-----+---+----+                                                                
                                                                                
                                             
|count| id|name|                                                                
                                                                                
                                             
+-----+---+----+                                                                
                                                                                
                                             
|    1|  1|   1|                                                                
                                                                                
                                             
|    2|  2|   2|                                                                
                                                                                
                                             
|    3|  3|   3|                                                                
                                                                                
                                             
+-----+---+----+                                                                
                                                                                
                                             
                                                                                
                                                                                
                                             
[TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------]         
                                                                                
                                             
17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data                
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data             
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updated size to 545                                 
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data             
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updated size to 545                                 
                                                                                
                                             
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data
17/08/10 15:30:44 WARN log: Updated size to 545
+---+----+-----+
| id|name|count|
+---+----+-----+
|  1|   1|    1|
|  2|   2|    2|
|  3|   3|    3|
|  4|   4|    4|
|  5|   5|    5|
|  6|   6|    6|
+---+----+-----+

+-----+---+-----+
|count| id| name|
+-----+---+-----+
|    7|  1|  one|
|    8|  2|  two|
|    9|  4|three|
|   10|  6|  six|
+-----+---+-----+

[TestSparkUtils]: DEBUG: [------------ Update ----------]
17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data
17/08/10 15:30:45 WARN log: Updating partition stats fast for: data
17/08/10 15:30:45 WARN log: Updated size to 1122
+---+----+-----+
| id|name|count|
+---+----+-----+
|  9|   4| null|
| 10|   6| null|
|  7|   1| null|
|  8|   2| null|
|  1|   1|    1|
|  2|   2|    2|
|  3|   3|    3|
|  4|   4|    4|
|  5|   5|    5|
|  6|   6|    6|
+---+----+-----+

..
----------------------------------------------------------------------
Ran 2 tests in 11.559s

OK
{code}

In the last show(). I see the data is corrupted. The data was switched on the 
columns, and I am getting null results. Below is the main clips of the code I 
am using generate the problem:

{code:title=spark init|borderStyle=solid}
        self.spark = SparkSession \
                .builder \
                .master("spark://localhost:7077") \
                .enableHiveSupport() \
                .getOrCreate()

        self.driver = SparkUtils(self.spark)
{code}

{code:title=Code for the test case|borderStyle=solid}
    def test_insert_table(self):
        self.log.debug("[test_insert_table]")
        table_name = "data"
        self.driver.drop_table(table_name)
        data0 = [
            {"id": 1, "name":"1", "count": 1},
            {"id": 2, "name":"2", "count": 2},
            {"id": 3, "name":"3", "count": 3},
        ]
        df_data0 = self.spark.createDataFrame(data0)
        df_return = self.driver.insert_table(df_data0, "data", ["count"])
        self.log.debug("[------------ Initial Create ----------]")
        df_return.show()
        data1 = [
            {"id": 4, "name":"4", "count": 4},
            {"id": 5, "name":"5", "count": 5},
            {"id": 6, "name":"6", "count": 6},
        ]
        df_data1 = self.spark.createDataFrame(data1)
        self.log.debug("[------------ Insert No Duplicates ----------]")
        df_return = self.driver.insert_table(df_data1, "data", ["count"])
        df_return.show()

        data3 = [
            {"id": 1, "name":"one", "count":7},
            {"id": 2, "name":"two", "count": 8},
            {"id": 4, "name":"three", "count": 9},
            {"id": 6, "name":"six", "count":10}
        ]
        df_data3 = self.spark.createDataFrame(data3)
        df_data3.show()
        self.log.debug("[------------ Update ----------]")
        df_return = self.driver.insert_table(df_data3, "data", ["count"])
        df_return.show()
{code}

As you can see I am doing three simple table writes. The first time it uses 
saveasTable() with partionBy(). The second time does just insertInto() and on 
the third insertInto() the data is corrupted.


{code:title=This is method I am testing|borderStyle=solid}
    def insert_table(self, df_table, table_name, primary_key=None):
        """
            A simple insert, will just append rows to the the existing table
        """
        if self.in_table(table_name):
            self.plogger.warn("[Inserting Into Table] " + table_name)
            df_table.write.mode("overwrite").insertInto(table_name)
            return self.read_table(table_name)
        else:
            self.plogger.warn("[Creating/Writing Table] " + table_name)
            
df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name)
            return df_table
{code}








> write.partitionBy() is giving me garbage data
> ---------------------------------------------
>
>                 Key: SPARK-21698
>                 URL: https://issues.apache.org/jira/browse/SPARK-21698
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.1, 2.2.0
>         Environment: Linux Ubuntu 17.04.  Python 3.5.
>            Reporter: Luis
>
> Spark partionBy is causing some data corruption.  I am doing three super 
> simple writes. . Below is the code to reproduce the problem.
> h4. rogram output output :
> {code:title=Program Output|borderStyle=solid}
> 17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data            
>                                                                               
>                                                  
> [TestSparkUtils]: DEBUG: [------------ Initial Create ----------]             
>                                                                               
>                                                  
> +-----+---+----+                                                              
>                                                                               
>                                                  
> |count| id|name|                                                              
>                                                                               
>                                                  
> +-----+---+----+                                                              
>                                                                               
>                                                  
> |    1|  1|   1|                                                              
>                                                                               
>                                                  
> |    2|  2|   2|                                                              
>                                                                               
>                                                  
> |    3|  3|   3|                                                              
>                                                                               
>                                                  
> +-----+---+----+                                                              
>                                                                               
>                                                  
>                                                                               
>                                                                               
>                                                  
> [TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------]       
>                                                                               
>                                                  
> 17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data              
>                                                                               
>                                                  
> 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data           
>                                                                               
>                                                  
> 17/08/10 15:30:44 WARN log: Updated size to 545                               
>                                                                               
>                                                  
> 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data           
>                                                                               
>                                                  
> 17/08/10 15:30:44 WARN log: Updated size to 545                               
>                                                                               
>                                                  
> 17/08/10 15:30:44 WARN log: Updating partition stats fast for: data
> 17/08/10 15:30:44 WARN log: Updated size to 545
> +---+----+-----+
> | id|name|count|
> +---+----+-----+
> |  1|   1|    1|
> |  2|   2|    2|
> |  3|   3|    3|
> |  4|   4|    4|
> |  5|   5|    5|
> |  6|   6|    6|
> +---+----+-----+
> +-----+---+-----+
> |count| id| name|
> +-----+---+-----+
> |    7|  1|  one|
> |    8|  2|  two|
> |    9|  4|three|
> |   10|  6|  six|
> +-----+---+-----+
> [TestSparkUtils]: DEBUG: [------------ Update ----------]
> 17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data
> 17/08/10 15:30:45 WARN log: Updating partition stats fast for: data
> 17/08/10 15:30:45 WARN log: Updated size to 1122
> +---+----+-----+
> | id|name|count|
> +---+----+-----+
> |  9|   4| null|
> | 10|   6| null|
> |  7|   1| null|
> |  8|   2| null|
> |  1|   1|    1|
> |  2|   2|    2|
> |  3|   3|    3|
> |  4|   4|    4|
> |  5|   5|    5|
> |  6|   6|    6|
> +---+----+-----+
> ..
> ----------------------------------------------------------------------
> Ran 2 tests in 11.559s
> OK
> {code}
> In the last show(). I see the data is corrupted. The data was switched on the 
> columns, and I am getting null results. Below is the main clips of the code I 
> am using generate the problem:
> {code:title=spark init|borderStyle=solid}
>         self.spark = SparkSession \
>                 .builder \
>                 .master("spark://localhost:7077") \
>                 .enableHiveSupport() \
>                 .getOrCreate()
>         self.driver = SparkUtils(self.spark)
> {code}
> {code:title=Code for the test case|borderStyle=solid}
>     def test_insert_table(self):
>         self.log.debug("[test_insert_table]")
>         table_name = "data"
>         self.driver.drop_table(table_name)
>         data0 = [
>             {"id": 1, "name":"1", "count": 1},
>             {"id": 2, "name":"2", "count": 2},
>             {"id": 3, "name":"3", "count": 3},
>         ]
>         df_data0 = self.spark.createDataFrame(data0)
>         df_return = self.driver.insert_table(df_data0, "data", ["count"])
>         self.log.debug("[------------ Initial Create ----------]")
>         df_return.show()
>         data1 = [
>             {"id": 4, "name":"4", "count": 4},
>             {"id": 5, "name":"5", "count": 5},
>             {"id": 6, "name":"6", "count": 6},
>         ]
>         df_data1 = self.spark.createDataFrame(data1)
>         self.log.debug("[------------ Insert No Duplicates ----------]")
>         df_return = self.driver.insert_table(df_data1, "data", ["count"])
>         df_return.show()
>         data3 = [
>             {"id": 1, "name":"one", "count":7},
>             {"id": 2, "name":"two", "count": 8},
>             {"id": 4, "name":"three", "count": 9},
>             {"id": 6, "name":"six", "count":10}
>         ]
>         df_data3 = self.spark.createDataFrame(data3)
>         df_data3.show()
>         self.log.debug("[------------ Update ----------]")
>         df_return = self.driver.insert_table(df_data3, "data", ["count"])
>         df_return.show()
> {code}
> As you can see I am doing three simple table writes. The first time it uses 
> saveasTable() with partionBy(). The second time does just insertInto() and on 
> the third insertInto() the data is corrupted.
> {code:title=This is method I am testing|borderStyle=solid}
>     def insert_table(self, df_table, table_name, primary_key=None):
>         """
>             A simple insert, will just append rows to the the existing table
>         """
>         if self.in_table(table_name):
>             self.plogger.warn("[Inserting Into Table] " + table_name)
>             df_table.write.mode("overwrite").insertInto(table_name)
>             return self.read_table(table_name)
>         else:
>             self.plogger.warn("[Creating/Writing Table] " + table_name)
>             
> df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name)
>             return df_table
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to