Luis created SPARK-21698:
----------------------------
Summary: write.partitionBy() is given me garbage data
Key: SPARK-21698
URL: https://issues.apache.org/jira/browse/SPARK-21698
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.2.0, 2.1.1
Environment: Linux Ubuntu 17.04. Python 3.5.
Reporter: Luis
Spark partionBy is causing some data corruption. I am doing three super simple
writes. . Below is the code to reproduce the problem.
h4. I'll by showing the program output output :
{code:title=Program Output|borderStyle=solid}
17/08/10 15:30:41 WARN [SparkUtils]: [Creating/Writing Table] data
[TestSparkUtils]: DEBUG: [------------ Initial Create ----------]
+-----+---+----+
|count| id|name|
+-----+---+----+
| 1| 1| 1|
| 2| 2| 2|
| 3| 3| 3|
+-----+---+----+
[TestSparkUtils]: DEBUG: [------------ Insert No Duplicates ----------]
17/08/10 15:30:43 WARN [SparkUtils]: [Inserting Into Table] data
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data
17/08/10 15:30:44 WARN log: Updated size to 545
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data
17/08/10 15:30:44 WARN log: Updated size to 545
17/08/10 15:30:44 WARN log: Updating partition stats fast for: data
17/08/10 15:30:44 WARN log: Updated size to 545
+---+----+-----+
| id|name|count|
+---+----+-----+
| 1| 1| 1|
| 2| 2| 2|
| 3| 3| 3|
| 4| 4| 4|
| 5| 5| 5|
| 6| 6| 6|
+---+----+-----+
+-----+---+-----+
|count| id| name|
+-----+---+-----+
| 7| 1| one|
| 8| 2| two|
| 9| 4|three|
| 10| 6| six|
+-----+---+-----+
[TestSparkUtils]: DEBUG: [------------ Update ----------]
17/08/10 15:30:44 WARN [SparkUtils]: [Inserting Into Table] data
17/08/10 15:30:45 WARN log: Updating partition stats fast for: data
17/08/10 15:30:45 WARN log: Updated size to 1122
+---+----+-----+
| id|name|count|
+---+----+-----+
| 9| 4| null|
| 10| 6| null|
| 7| 1| null|
| 8| 2| null|
| 1| 1| 1|
| 2| 2| 2|
| 3| 3| 3|
| 4| 4| 4|
| 5| 5| 5|
| 6| 6| 6|
+---+----+-----+
..
----------------------------------------------------------------------
Ran 2 tests in 11.559s
OK
{code}
In the last show(). I see the data is corrupted. The data was switched on the
columns, and I am getting null results. Below is the main clips of the code I
am using generate the problem:
{code:title=spark init|borderStyle=solid}
self.spark = SparkSession \
.builder \
.master("spark://localhost:7077") \
.enableHiveSupport() \
.getOrCreate()
self.driver = SparkUtils(self.spark)
{code}
{code:title=Code for the test case|borderStyle=solid}
def test_insert_table(self):
self.log.debug("[test_insert_table]")
table_name = "data"
self.driver.drop_table(table_name)
data0 = [
{"id": 1, "name":"1", "count": 1},
{"id": 2, "name":"2", "count": 2},
{"id": 3, "name":"3", "count": 3},
]
df_data0 = self.spark.createDataFrame(data0)
df_return = self.driver.insert_table(df_data0, "data", ["count"])
self.log.debug("[------------ Initial Create ----------]")
df_return.show()
data1 = [
{"id": 4, "name":"4", "count": 4},
{"id": 5, "name":"5", "count": 5},
{"id": 6, "name":"6", "count": 6},
]
df_data1 = self.spark.createDataFrame(data1)
self.log.debug("[------------ Insert No Duplicates ----------]")
df_return = self.driver.insert_table(df_data1, "data", ["count"])
df_return.show()
data3 = [
{"id": 1, "name":"one", "count":7},
{"id": 2, "name":"two", "count": 8},
{"id": 4, "name":"three", "count": 9},
{"id": 6, "name":"six", "count":10}
]
df_data3 = self.spark.createDataFrame(data3)
df_data3.show()
self.log.debug("[------------ Update ----------]")
df_return = self.driver.insert_table(df_data3, "data", ["count"])
df_return.show()
{code}
As you can see I am doing three simple table writes. The first time it uses
saveasTable() with partionBy(). The second time does just insertInto() and on
the third insertInto() the data is corrupted.
{code:title=This is method I am testing|borderStyle=solid}
def insert_table(self, df_table, table_name, primary_key=None):
"""
A simple insert, will just append rows to the the existing table
"""
if self.in_table(table_name):
self.plogger.warn("[Inserting Into Table] " + table_name)
df_table.write.mode("overwrite").insertInto(table_name)
return self.read_table(table_name)
else:
self.plogger.warn("[Creating/Writing Table] " + table_name)
df_table.write.partitionBy(primary_key).mode("overwrite").saveAsTable(self.database+"."+table_name)
return df_table
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]