Nazarii Bardiuk created SPARK-23512:
---------------------------------------

             Summary: Complex operations on Dataframe corrupts data
                 Key: SPARK-23512
                 URL: https://issues.apache.org/jira/browse/SPARK-23512
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.2.1
            Reporter: Nazarii Bardiuk


Next code demonstrates sequence of transformations for a DataFrame that 
corrupts data
{code}
from pyspark import SparkContext, SQLContext, Row
from pyspark.sql import Window
from pyspark.sql.functions import explode, lit, count, row_number, col, 
countDistinct

ss = SQLContext(SparkContext('local', 'pyspark'))
diffs = ss.createDataFrame([
    Row(id="1", a=["1"], b=["2"], t="2"),
    Row(id="2", a=["2"], b=["1"], t="1"),
    Row(id="3", a=["1"], b=["4", "3"], t="3"),
    Row(id="3", a=["1"], b=["4", "3"], t="4"),
    Row(id="4", a=["1"], b=["4", "3"], t="3"),
    Row(id="4", a=["1"], b=["4", "3"], t="4")
])

a = diffs.select("id", explode("a").alias("l"), "t").withColumn("problem", 
lit("a"))
b = diffs.select("id", explode("b").alias("l"), "t").withColumn("problem", 
lit("b")) \
    .filter(col("t") != col("l"))

all = a.union(b)

grouped = all \
    .groupBy("l", "t", "problem").agg(count("id").alias("count")) \
    .withColumn("rn", row_number().over(Window.partitionBy("l", 
"problem").orderBy(col("count").desc()))) \
    .withColumn("f", (col("rn") < 2) & (col("count") > 1)) \
    .cache()  # the change that broke test

keep = grouped.filter("f").select("l", "t", "problem", "count")

agg = all.join(grouped.filter(~col("f")), ["l", "t", "problem"]) \
    .withColumn("t", lit(None)) \
    .groupBy("l", "t", "problem").agg(countDistinct("id").alias("count"))


keep.union(agg).show() # corrupts column "problem"
agg.union(keep).show() # as expected
{code}
 

Expected: data in "problem" column of both unions is the same
 Actual: "problem" column looses data
{code}
keep.union(agg).show() # corrupts column "problem"
+---+----+-------+-----+                                                        
|  l|   t|problem|count|
+---+----+-------+-----+
|  3|   4|      a|    2|
|  4|   3|      a|    2|
|  1|   4|      a|    2|
|  1|null|      a|    3|
|  2|null|      a|    1|
+---+----+-------+-----+

agg.union(keep).show() # as expected
+---+----+-------+-----+                                                        
|  l|   t|problem|count|
+---+----+-------+-----+
|  1|null|      a|    3|
|  2|null|      a|    1|
|  3|   4|      b|    2|
|  4|   3|      b|    2|
|  1|   4|      a|    2|
+---+----+-------+-----+

{code}
Note a cache() statement that was a tipping point that broke our code, without 
it works as expected



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to