[ 
https://issues.apache.org/jira/browse/SPARK-17962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen Hankinson updated SPARK-17962:
--------------------------------------
    Description: 
Environment can be reproduced via this git repo using the Deploy to Azure 
button: https://github.com/shankinson/spark

We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We had 
written some new code using the Spark DataFrame/DataSet APIs but are noticing 
incorrect results on a join after writing and then reading data to Windows 
Azure Storage Blobs (The default HDFS location). I've been able to duplicate 
the issue with the following snippet of code running on the cluster.

case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)

val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 
1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS

dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show
outputs

|| user||dimension||score||
|12345|        0|  1.0|

||dimension||cluster||score||
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|

|| user||dimension||score||dimension||cluster||score||
|12345|        0|  1.0|        0|      1|  1.0|

which is correct. However after writing and reading the data, we see this

dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")

val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]

dims2.show
cent2.show

dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
outputs

|| user||dimension||score||
|12345|        0|  1.0|

||dimension||cluster||score||
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

|| user||dimension||score||dimension||cluster||score||
|12345|        0|  1.0|     null|   null| null|

However, using the RDD API produces the correct result

dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => 
(row.dimension, row) ) ).take(5)

res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = 
Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))

We've tried changing the output format to ORC instead of parquet, but we see 
the same results. Running Spark 2.0 locally, not on a cluster, does not have 
this issue. Also running spark in local mode on the master node of the Hadoop 
cluster also works. Only when running on top of YARN do we see this issue.

This also seems very similar to this issue: 
https://issues.apache.org/jira/browse/SPARK-10896

We have also determined this appears to be related to the memory settings of 
the cluster. The worker machines have 56000MB available, the node manager 
memory is set to 54784M and executor memory set to 48407M when we see this 
issue happen. Lowering the executor memory to something like 28407M removes the 
issue from happening.

  was:
Environment can be reproduced via this git repo using the Deploy to Azure 
button: https://github.com/shankinson/spark

We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We had 
written some new code using the Spark DataFrame/DataSet APIs but are noticing 
incorrect results on a join after writing and then reading data to Windows 
Azure Storage Blobs (The default HDFS location). I've been able to duplicate 
the issue with the following snippet of code running on the cluster.

case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)

val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 
1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS

dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show
outputs

+-----+---------+-----+                                                         
| user|dimension|score|
+-----+---------+-----+
|12345|        0|  1.0|
+-----+---------+-----+

+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345|        0|  1.0|        0|      1|  1.0|
+-----+---------+-----+---------+-------+-----+
which is correct. However after writing and reading the data, we see this

dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")

val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]

dims2.show
cent2.show

dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
outputs

+-----+---------+-----+                                                         
| user|dimension|score|
+-----+---------+-----+
|12345|        0|  1.0|
+-----+---------+-----+

+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345|        0|  1.0|     null|   null| null|
+-----+---------+-----+---------+-------+-----+

However, using the RDD API produces the correct result

dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => 
(row.dimension, row) ) ).take(5)

res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = 
Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))

We've tried changing the output format to ORC instead of parquet, but we see 
the same results. Running Spark 2.0 locally, not on a cluster, does not have 
this issue. Also running spark in local mode on the master node of the Hadoop 
cluster also works. Only when running on top of YARN do we see this issue.

This also seems very similar to this issue: 
https://issues.apache.org/jira/browse/SPARK-10896

We have also determined this appears to be related to the memory settings of 
the cluster. The worker machines have 56000MB available, the node manager 
memory is set to 54784M and executor memory set to 48407M when we see this 
issue happen. Lowering the executor memory to something like 28407M removes the 
issue from happening.


> DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn
> ----------------------------------------------------------------------
>
>                 Key: SPARK-17962
>                 URL: https://issues.apache.org/jira/browse/SPARK-17962
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, YARN
>    Affects Versions: 2.0.0
>         Environment: Centos 7.2, Hadoop 2.7.2, Spark 2.0.0
>            Reporter: Stephen Hankinson
>
> Environment can be reproduced via this git repo using the Deploy to Azure 
> button: https://github.com/shankinson/spark
> We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We 
> had written some new code using the Spark DataFrame/DataSet APIs but are 
> noticing incorrect results on a join after writing and then reading data to 
> Windows Azure Storage Blobs (The default HDFS location). I've been able to 
> duplicate the issue with the following snippet of code running on the cluster.
> case class UserDimensions(user: Long, dimension: Long, score: Double)
> case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)
> val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
> val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 
> 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS
> dims.show
> cent.show
> dims.join(cent, dims("dimension") === cent("dimension") ).show
> outputs
> || user||dimension||score||
> |12345|        0|  1.0|
> ||dimension||cluster||score||
> |        0|      1|  1.0|
> |        1|      0|  1.0|
> |        2|      2|  1.0|
> || user||dimension||score||dimension||cluster||score||
> |12345|        0|  1.0|        0|      1|  1.0|
> which is correct. However after writing and reading the data, we see this
> dims.write.mode("overwrite").save("/tmp/dims2.parquet")
> cent.write.mode("overwrite").save("/tmp/cent2.parquet")
> val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
> val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]
> dims2.show
> cent2.show
> dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
> outputs
> || user||dimension||score||
> |12345|        0|  1.0|
> ||dimension||cluster||score||
> |        0|      1|  1.0|
> |        1|      0|  1.0|
> |        2|      2|  1.0|
> +---------+-------+-----+
> || user||dimension||score||dimension||cluster||score||
> |12345|        0|  1.0|     null|   null| null|
> However, using the RDD API produces the correct result
> dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => 
> (row.dimension, row) ) ).take(5)
> res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = 
> Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))
> We've tried changing the output format to ORC instead of parquet, but we see 
> the same results. Running Spark 2.0 locally, not on a cluster, does not have 
> this issue. Also running spark in local mode on the master node of the Hadoop 
> cluster also works. Only when running on top of YARN do we see this issue.
> This also seems very similar to this issue: 
> https://issues.apache.org/jira/browse/SPARK-10896
> We have also determined this appears to be related to the memory settings of 
> the cluster. The worker machines have 56000MB available, the node manager 
> memory is set to 54784M and executor memory set to 48407M when we see this 
> issue happen. Lowering the executor memory to something like 28407M removes 
> the issue from happening.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to