回复:ApplicationMaster + Fair Scheduler + Dynamic resource allocation

2016-08-30 Thread 西0247


1) Is that what you want?
 spark.yarn.am.memory when yarn-client
spark.driver.memory    when   yarn-cluster
2)I think you need to set these configs in spark-default.conf
spark.dynamicAllocation.minExecutors 
spark.dynamicAllocation.maxExecutors 


3) It's not about the fair scheduler.Instead of use a mapreduce conf, you need 
to set a env like this:export SPARK_EXECUTOR_CORES=6
--发件人:Cleosson 
José Pirani de Souza 发送时间:2016年8月30日(星期二) 19:30收件人:user 
主 题:ApplicationMaster + Fair Scheduler + Dynamic 
resource allocation
Hi 
 I am using Spark 1.6.2 and Hadoop 2.7.2 in a single node cluster 
(Pseudo-Distributed Operation settings for testing propose). For every spark 
application that I submit I get:  - ApplicationMaster with 1024 MB of RAM and 1 
vcore  - And one container with 1024 MB of RAM and 1 vcore I have three 
questions using dynamic allocation and Fair Scheduler:
  1) How do I set ApplicationMaster max memory to 512m ?  2) How do I get more 
than one container running per application ? (Using dynamic allocation I cannot 
set the spark.executor.instances)   3) I noticed that YARN ignores 
yarn.app.mapreduce.am.resource.mb, yarn.app.mapreduce.am.resource.cpu-vcores 
and yarn.app.mapreduce.am.command-opts when the scheduler is Fair, am I
 right ?

 My settings:
 Spark    # spark-defaults.conf    spark.driver.memory                512m    
spark.yarn.am.memory               512m    spark.executor.memory              
512m    spark.executor.cores               2    spark.dynamicAllocation.enabled 
   true    spark.shuffle.service.enabled  true YARN    # yarn-site.xml    
yarn.scheduler.maximum-allocation-vcores    32    
yarn.scheduler.minimum-allocation-vcores    1    
yarn.scheduler.maximum-allocation-mb        16384    
yarn.scheduler.minimum-allocation-mb        64    
yarn.scheduler.fair.preemption              true    
yarn.resourcemanager.scheduler.class        
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler    
yarn.nodemanager.aux-services               spark_shuffle    # mapred-site.xml  
  yarn.app.mapreduce.am.resource.mb           512    
yarn.app.mapreduce.am.resource.cpu-vcores   1    
yarn.app.mapreduce.am.command-opts          -Xmx384    mapreduce.map.memory.mb  
                   1024    mapreduce.map.java.opts                     -Xmx768m 
   mapreduce.reduce.memory.mb                  1024    
mapreduce.reduce.java.opts                  -Xmx768m
Thanks in advance,Cleosson


回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread 西0247
Hi ,I think it is related to this issue [Adaptive execution in Spark]

https://issues.apache.org/jira/browse/SPARK-9850
I will learn more about it.


--发件人:梅西0247 
<zhen...@dtdream.com>发送时间:2016年6月21日(星期二) 10:31收件人:Mich Talebzadeh 
<mich.talebza...@gmail.com>; Takeshi Yamamuro <linguin@gmail.com>; Yong 
Zhang <java8...@hotmail.com>抄 送:user@spark.apache.org <user@spark.apache.org>主 
题:回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To Yong Zhang:Yes, a broadcast join hint works. But it is not what I 
want.Sometimes the result is really too big to cast a broadcast on it.  What I 
want is a more adaptive implementation.


--发件人:Yong 
Zhang <java8...@hotmail.com>发送时间:2016年6月20日(星期一) 22:42收件人:Mich Talebzadeh 
<mich.talebza...@gmail.com>; Takeshi Yamamuro <linguin@gmail.com>抄 送:梅西0247 
<zhen...@dtdream.com>; user@spark.apache.org <user@spark.apache.org>主 题:RE: Is 
it possible to turn a SortMergeJoin into BroadcastHashJoin?
If  you are using Spark > 1.5, the best way is to use DataFrame API directly, 
instead of SQL. In dataframe, you can specify the boardcast join hint in the 
dataframe API, which will force the boardcast join.
Yong

From: mich.talebza...@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin@gmail.com
CC: zhen...@dtdream.com; user@spark.apache.org

what sort of the tables are these?
Can you register the result set as temp table and do a join on that assuming 
the RS is going to be small
s.filter(($"c2" < 1000)).registerTempTable("tmp")
and then do a join between tmp and Table2
HTH

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 20 June 2016 at 12:38, Takeshi Yamamuro <linguin@gmail.com> wrote:
Seems it is hard to predict the output size of filters because the current 
spark has limited statistics of input data. A few hours ago, Reynold created a 
ticket for cost-based optimizer framework in 
https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, 
and suggestions, feel free to join the discussion.
// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247 <zhen...@dtdream.com> wrote:


Thanks for your reply, In fact, that is what i just did
But my question is: Can we change the spark join behavior more clever, to turn 
a sortmergejoin into broadcasthashjoin automatically when if "found" that a 
output RDD is small enough?

------发件人:Takeshi 
Yamamuro <linguin@gmail.com>发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 
<zhen...@dtdream.com>抄 送:user <user@spark.apache.org>主 题:Re: Is it possible to 
turn a SortMergeJoin into BroadcastHashJoin?
Hi,
How about caching the result of `select * from a where a.c2 < 1000`, then 
joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` 
to enable broadcast joins for the result table.
// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247 <zhen...@dtdream.com> wrote:
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )







-- 
---
Takeshi Yamamuro




-- 
---
Takeshi Yamamuro

  



回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread 西0247
To Yong Zhang:Yes, a broadcast join hint works. But it is not what I 
want.Sometimes the result is really too big to cast a broadcast on it.  What I 
want is a more adaptive implementation.


--发件人:Yong 
Zhang <java8...@hotmail.com>发送时间:2016年6月20日(星期一) 22:42收件人:Mich Talebzadeh 
<mich.talebza...@gmail.com>; Takeshi Yamamuro <linguin@gmail.com>抄 送:梅西0247 
<zhen...@dtdream.com>; user@spark.apache.org <user@spark.apache.org>主 题:RE: Is 
it possible to turn a SortMergeJoin into BroadcastHashJoin?
If  you are using Spark > 1.5, the best way is to use DataFrame API directly, 
instead of SQL. In dataframe, you can specify the boardcast join hint in the 
dataframe API, which will force the boardcast join.
Yong

From: mich.talebza...@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin@gmail.com
CC: zhen...@dtdream.com; user@spark.apache.org

what sort of the tables are these?
Can you register the result set as temp table and do a join on that assuming 
the RS is going to be small
s.filter(($"c2" < 1000)).registerTempTable("tmp")
and then do a join between tmp and Table2
HTH

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 20 June 2016 at 12:38, Takeshi Yamamuro <linguin@gmail.com> wrote:
Seems it is hard to predict the output size of filters because the current 
spark has limited statistics of input data. A few hours ago, Reynold created a 
ticket for cost-based optimizer framework in 
https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, 
and suggestions, feel free to join the discussion.
// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247 <zhen...@dtdream.com> wrote:


Thanks for your reply, In fact, that is what i just did
But my question is: Can we change the spark join behavior more clever, to turn 
a sortmergejoin into broadcasthashjoin automatically when if "found" that a 
output RDD is small enough?

------发件人:Takeshi 
Yamamuro <linguin@gmail.com>发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 
<zhen...@dtdream.com>抄 送:user <user@spark.apache.org>主 题:Re: Is it possible to 
turn a SortMergeJoin into BroadcastHashJoin?
Hi,
How about caching the result of `select * from a where a.c2 < 1000`, then 
joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` 
to enable broadcast joins for the result table.
// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247 <zhen...@dtdream.com> wrote:
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )







-- 
---
Takeshi Yamamuro




-- 
---
Takeshi Yamamuro

  


Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread 西0247
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )