[
https://issues.apache.org/jira/browse/SPARK-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458239#comment-15458239
]
Miguel Tormo edited comment on SPARK-17211 at 9/2/16 11:19 AM:
---------------------------------------------------------------
Hi,
I have been testing this and it seems to be related to memory pointers
compression. It happens when the JVMs of the executors use different memory
pointers than the driver. The JVM by default enables UseCompressedOops for
heaps under 32 GB. So if both the driver and executors memory heaps are over 32
GB, or if both are under 32 GB, there is no problem.
As a workaround, disabling UseCompressedOops in the smaller heap seems to work.
For example:
spark-shell --driver-memory 30G --executor-memory 40G --driver-java-options
"-XX:-UseCompressedOops" ...
Or the other case:
spark-shell --driver-memory 45G --executor-memory 30G --conf
"spark.executor.extraJavaOptions=-XX:-UseCompressedOops" ...
I also tested JVM 1.7 and 1.8, with the same results. On EMR or oustide, on
YARN or standalone, it doesn't matter. Always tested with Spark 2.0.0 using
Scala 2.11.8.
was (Author: migtor):
Hi,
I have been testing this and it seems to be related to memory pointers
compression. It happens when the JVMs of the executors use different memory
pointers than the driver. The JVM by default enables UseCompressedOops for
heaps under 32 GB. So if both the driver and executors memory heaps are over 32
GB, or if both are under 32 GB, there is no problem.
I found that disabling UseCompressedOops in the smaller heap fixes the issue.
For example:
spark-shell --driver-memory 30G --executor-memory 40G --driver-java-options
"-XX:-UseCompressedOops" ...
Or the other case:
spark-shell --driver-memory 45G --executor-memory 30G --conf
"spark.executor.extraJavaOptions=-XX:-UseCompressedOops" ...
I also tested JVM 1.7 and 1.8, with the same results. On EMR or oustide, on
YARN or standalone, it doesn't matter. Always tested with Spark 2.0.0 using
Scala 2.11.8.
> Broadcast join produces incorrect results on EMR with large driver memory
> -------------------------------------------------------------------------
>
> Key: SPARK-17211
> URL: https://issues.apache.org/jira/browse/SPARK-17211
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.0.0
> Reporter: Jarno Seppanen
> Assignee: Davies Liu
>
> Broadcast join produces incorrect columns in join result, see below for an
> example. The same join but without using broadcast gives the correct columns.
> Running PySpark on YARN on Amazon EMR 5.0.0.
> {noformat}
> import pyspark.sql.functions as func
> keys = [
> (54000000, 0),
> (54000001, 1),
> (54000002, 2),
> ]
> keys_df = spark.createDataFrame(keys, ['key_id', 'value']).coalesce(1)
> keys_df.show()
> # +--------+-----+
> # | key_id|value|
> # +--------+-----+
> # |54000000| 0|
> # |54000001| 1|
> # |54000002| 2|
> # +--------+-----+
> data = [
> (54000002, 1),
> (54000000, 2),
> (54000001, 3),
> ]
> data_df = spark.createDataFrame(data, ['key_id', 'foo'])
> data_df.show()
> # +--------+---+
>
> # | key_id|foo|
> # +--------+---+
> # |54000002| 1|
> # |54000000| 2|
> # |54000001| 3|
> # +--------+---+
> ### INCORRECT ###
> data_df.join(func.broadcast(keys_df), 'key_id').show()
> # +--------+---+--------+
>
> # | key_id|foo| value|
> # +--------+---+--------+
> # |54000002| 1|54000002|
> # |54000000| 2|54000000|
> # |54000001| 3|54000001|
> # +--------+---+--------+
> ### CORRECT ###
> data_df.join(keys_df, 'key_id').show()
> # +--------+---+-----+
> # | key_id|foo|value|
> # +--------+---+-----+
> # |54000000| 2| 0|
> # |54000001| 3| 1|
> # |54000002| 1| 2|
> # +--------+---+-----+
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]