[
https://issues.apache.org/jira/browse/SPARK-10914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945358#comment-14945358
]
Reynold Xin edited comment on SPARK-10914 at 10/7/15 10:22 PM:
---------------------------------------------------------------
Thanks for looking into it. I have narrowed it down a lot now. It depends on
the --executor-memory setting!
For me using "bin/spark-shell" locally I don't see the problem, but I do see it
when I use a standalone cluster. It reliably reproduces whenever I specify
"--executor-memory 32g" or greater, but if I leave executor-memory unset or
specify a value of 31g or less I get the correct result.
Here's a correct run:
{code}
spark@spark-master:~/spark-1.5.1-bin-hadoop2.6$ bin/spark-shell --master
spark://spark-master:7077 --executor-memory 31g
scala> val x = sql("select 1 xx union all select 2")
x: org.apache.spark.sql.DataFrame = [xx: int]
scala> val y = sql("select 1 yy union all select 2")
y: org.apache.spark.sql.DataFrame = [yy: int]
scala> x.join(y, $"xx" === $"yy").count() /* expect 2, get 2 */
res0: Long = 2
{code}
Here's an incorrect run, with an explain plan (the explain is the same in any
case):
{code}
spark@spark-master:~/spark-1.5.1-bin-hadoop2.6$ bin/spark-shell --master
spark://spark-master:7077 --executor-memory 32g
scala> val x = sql("select 1 xx union all select 2")
x: org.apache.spark.sql.DataFrame = [xx: int]
scala> val y = sql("select 1 yy union all select 2")
y: org.apache.spark.sql.DataFrame = [yy: int]
scala> x.join(y, $"xx" === $"yy").explain()
== Physical Plan ==
BroadcastHashJoin [xx#0], [yy#2], BuildRight
Union
TungstenProject [1 AS xx#0]
Scan OneRowRelation[]
TungstenProject [2 AS _c0#1]
Scan OneRowRelation[]
Union
TungstenProject [1 AS yy#2]
Scan OneRowRelation[]
TungstenProject [2 AS _c0#3]
Scan OneRowRelation[]
scala> x.join(y, $"xx" === $"yy").count() /* expect 2, get 0 */
res1: Long = 0
{code}
I have two machines in my cluster:
- one is Ubuntu 12.04, running the spark-master node
- one is Ubuntu 14.04, running the spark-slave node.
Both have 256Gb RAM.
JVM on both machines is: oracle-java7-installer from PPA:
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
was (Author: benm):
Thanks for looking into it. I have narrowed it down a lot now. It depends on
the --executor-memory setting!
For me using "bin/spark-shell" locally I don't see the problem, but I do see it
when I use a standalone cluster. It reliably reproduces whenever I specify
"--executor-memory 32g" or greater, but if I leave executor-memory unset or
specify a value of 31g or less I get the correct result.
Here's a correct run:
spark@spark-master:~/spark-1.5.1-bin-hadoop2.6$ bin/spark-shell --master
spark://spark-master:7077 --executor-memory 31g
scala> val x = sql("select 1 xx union all select 2")
x: org.apache.spark.sql.DataFrame = [xx: int]
scala> val y = sql("select 1 yy union all select 2")
y: org.apache.spark.sql.DataFrame = [yy: int]
scala> x.join(y, $"xx" === $"yy").count() /* expect 2, get 2 */
res0: Long = 2
Here's an incorrect run, with an explain plan (the explain is the same in any
case):
spark@spark-master:~/spark-1.5.1-bin-hadoop2.6$ bin/spark-shell --master
spark://spark-master:7077 --executor-memory 32g
scala> val x = sql("select 1 xx union all select 2")
x: org.apache.spark.sql.DataFrame = [xx: int]
scala> val y = sql("select 1 yy union all select 2")
y: org.apache.spark.sql.DataFrame = [yy: int]
scala> x.join(y, $"xx" === $"yy").explain()
== Physical Plan ==
BroadcastHashJoin [xx#0], [yy#2], BuildRight
Union
TungstenProject [1 AS xx#0]
Scan OneRowRelation[]
TungstenProject [2 AS _c0#1]
Scan OneRowRelation[]
Union
TungstenProject [1 AS yy#2]
Scan OneRowRelation[]
TungstenProject [2 AS _c0#3]
Scan OneRowRelation[]
scala> x.join(y, $"xx" === $"yy").count() /* expect 2, get 0 */
res1: Long = 0
I have two machines in my cluster:
- one is Ubuntu 12.04, running the spark-master node
- one is Ubuntu 14.04, running the spark-slave node.
Both have 256Gb RAM.
JVM on both machines is: oracle-java7-installer from PPA:
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
> Incorrect empty join sets when executor-memory >= 32g
> -----------------------------------------------------
>
> Key: SPARK-10914
> URL: https://issues.apache.org/jira/browse/SPARK-10914
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.5.0, 1.5.1
> Environment: Ubuntu 14.04 (spark-slave), 12.04 (master)
> Reporter: Ben Moran
>
> Using an inner join, to match together two integer columns, I generally get
> no results when there should be matches. But the results vary and depend on
> whether the dataframes are coming from SQL, JSON, or cached, as well as the
> order in which I cache things and query them.
> This minimal example reproduces it consistently for me in the spark-shell, on
> new installs of both 1.5.0 and 1.5.1 (pre-built against Hadoop 2.6 from
> http://spark.apache.org/downloads.html.)
> {code}
> /* x is {"xx":1}{"xx":2} and y is just {"yy":1}{"yy:2} */
> val x = sql("select 1 xx union all select 2")
> val y = sql("select 1 yy union all select 2")
> x.join(y, $"xx" === $"yy").count() /* expect 2, get 0 */
> /* If I cache both tables it works: */
> x.cache()
> y.cache()
> x.join(y, $"xx" === $"yy").count() /* expect 2, get 2 */
> /* but this still doesn't work: */
> x.join(y, $"xx" === $"yy").filter("yy=1").count() /* expect 1, get 0 */
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]