回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread 梅西0247
Hi ,I think it is related to this issue [Adaptive execution in Spark]

https://issues.apache.org/jira/browse/SPARK-9850
I will learn more about it.


--发件人:梅西0247 
发送时间:2016年6月21日(星期二) 10:31收件人:Mich Talebzadeh 
; Takeshi Yamamuro ; Yong 
Zhang 抄 送:user@spark.apache.org 主 
题:回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To Yong Zhang:Yes, a broadcast join hint works. But it is not what I 
want.Sometimes the result is really too big to cast a broadcast on it.  What I 
want is a more adaptive implementation.


--发件人:Yong 
Zhang 发送时间:2016年6月20日(星期一) 22:42收件人:Mich Talebzadeh 
; Takeshi Yamamuro 抄 送:梅西0247 
; user@spark.apache.org 主 题:RE: Is 
it possible to turn a SortMergeJoin into BroadcastHashJoin?
If  you are using Spark > 1.5, the best way is to use DataFrame API directly, 
instead of SQL. In dataframe, you can specify the boardcast join hint in the 
dataframe API, which will force the boardcast join.
Yong

From: mich.talebza...@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin@gmail.com
CC: zhen...@dtdream.com; user@spark.apache.org

what sort of the tables are these?
Can you register the result set as temp table and do a join on that assuming 
the RS is going to be small
s.filter(($"c2" < 1000)).registerTempTable("tmp")
and then do a join between tmp and Table2
HTH

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 20 June 2016 at 12:38, Takeshi Yamamuro  wrote:
Seems it is hard to predict the output size of filters because the current 
spark has limited statistics of input data. A few hours ago, Reynold created a 
ticket for cost-based optimizer framework in 
https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, 
and suggestions, feel free to join the discussion.
// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247  wrote:


Thanks for your reply, In fact, that is what i just did
But my question is: Can we change the spark join behavior more clever, to turn 
a sortmergejoin into broadcasthashjoin automatically when if "found" that a 
output RDD is small enough?

--发件人:Takeshi 
Yamamuro 发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 
抄 送:user 主 题:Re: Is it possible to 
turn a SortMergeJoin into BroadcastHashJoin?
Hi,
How about caching the result of `select * from a where a.c2 < 1000`, then 
joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` 
to enable broadcast joins for the result table.
// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )







-- 
---
Takeshi Yamamuro




-- 
---
Takeshi Yamamuro

  



回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread 梅西0247
To Yong Zhang:Yes, a broadcast join hint works. But it is not what I 
want.Sometimes the result is really too big to cast a broadcast on it.  What I 
want is a more adaptive implementation.


--发件人:Yong 
Zhang 发送时间:2016年6月20日(星期一) 22:42收件人:Mich Talebzadeh 
; Takeshi Yamamuro 抄 送:梅西0247 
; user@spark.apache.org 主 题:RE: Is 
it possible to turn a SortMergeJoin into BroadcastHashJoin?
If  you are using Spark > 1.5, the best way is to use DataFrame API directly, 
instead of SQL. In dataframe, you can specify the boardcast join hint in the 
dataframe API, which will force the boardcast join.
Yong

From: mich.talebza...@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin@gmail.com
CC: zhen...@dtdream.com; user@spark.apache.org

what sort of the tables are these?
Can you register the result set as temp table and do a join on that assuming 
the RS is going to be small
s.filter(($"c2" < 1000)).registerTempTable("tmp")
and then do a join between tmp and Table2
HTH

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 20 June 2016 at 12:38, Takeshi Yamamuro  wrote:
Seems it is hard to predict the output size of filters because the current 
spark has limited statistics of input data. A few hours ago, Reynold created a 
ticket for cost-based optimizer framework in 
https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, 
and suggestions, feel free to join the discussion.
// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247  wrote:


Thanks for your reply, In fact, that is what i just did
But my question is: Can we change the spark join behavior more clever, to turn 
a sortmergejoin into broadcasthashjoin automatically when if "found" that a 
output RDD is small enough?

--发件人:Takeshi 
Yamamuro 发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 
抄 送:user 主 题:Re: Is it possible to 
turn a SortMergeJoin into BroadcastHashJoin?
Hi,
How about caching the result of `select * from a where a.c2 < 1000`, then 
joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` 
to enable broadcast joins for the result table.
// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )







-- 
---
Takeshi Yamamuro




-- 
---
Takeshi Yamamuro

  


Re: Build Spark 2.0 succeeded but could not run it on YARN

2016-06-20 Thread Ted Yu
What operations did you run in the Spark shell ?

It would be easier for other people to reproduce using your code snippet.

Thanks

On Mon, Jun 20, 2016 at 6:20 PM, Jeff Zhang  wrote:

> Could you check the yarn app logs for details ? run command "yarn logs
> -applicationId " to get the yarn log
>
> On Tue, Jun 21, 2016 at 9:18 AM, wgtmac  wrote:
>
>> I ran into problems in building Spark 2.0. The build process actually
>> succeeded but when I uploaded to cluster and launched the Spark shell on
>> YARN, it reported following exceptions again and again:
>>
>> 16/06/17 03:32:00 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
>> Container marked as failed: container_e437_1464601161543_1582846_01_13
>> on host: hadoopworker575-sjc1.. Exit status: 1.
>> Diagnostics:
>> Exception from container-launch.
>> Container id: container_e437_1464601161543_1582846_01_13
>> Exit code: 1
>> Stack trace: ExitCodeException exitCode=1:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
>> at org.apache.hadoop.util.Shell.run(Shell.java:455)
>> at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
>> at
>>
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
>> at
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Container exited with a non-zero exit code 1
>>
>> =
>> Build command:
>>
>> export JAVA_HOME=   // tried both java7 and java8
>> ./dev/change-scala-version.sh 2.11   // tried both 2.10 and 2.11
>> ./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
>> -Phive-thriftserver -DskipTests clean package
>>
>> The 2.0.0-preview version downloaded from Spark website works well so it
>> is
>> not the problem of my cluster. Also I can make it to build Spark 1.5 and
>> 1.6
>> and run them on the cluster. But in Spark 2.0, I failed both 2.0.0-preview
>> tag and 2.0.0-SNAPSHOT. Anyone has any idea? Thanks!
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Build-Spark-2-0-succeeded-but-could-not-run-it-on-YARN-tp27199.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Notebook(s) for Spark 2.0 ?

2016-06-20 Thread Stephen Boesch
Having looked closely at Jupyter, Zeppelin, and Spark-Notebook : only the
latter seems to be close to having support for Spark 2.X.

While I am interested in using Spark Notebook as soon as that support were
available are there alternatives that work *now*?  For example some
unmerged -yet -working fork(s) ?

thanks


Re: Build Spark 2.0 succeeded but could not run it on YARN

2016-06-20 Thread Jeff Zhang
Could you check the yarn app logs for details ? run command "yarn logs
-applicationId " to get the yarn log

On Tue, Jun 21, 2016 at 9:18 AM, wgtmac  wrote:

> I ran into problems in building Spark 2.0. The build process actually
> succeeded but when I uploaded to cluster and launched the Spark shell on
> YARN, it reported following exceptions again and again:
>
> 16/06/17 03:32:00 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container marked as failed: container_e437_1464601161543_1582846_01_13
> on host: hadoopworker575-sjc1.. Exit status: 1.
> Diagnostics:
> Exception from container-launch.
> Container id: container_e437_1464601161543_1582846_01_13
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at
>
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
> at
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Container exited with a non-zero exit code 1
>
> =
> Build command:
>
> export JAVA_HOME=   // tried both java7 and java8
> ./dev/change-scala-version.sh 2.11   // tried both 2.10 and 2.11
> ./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
> -Phive-thriftserver -DskipTests clean package
>
> The 2.0.0-preview version downloaded from Spark website works well so it is
> not the problem of my cluster. Also I can make it to build Spark 1.5 and
> 1.6
> and run them on the cluster. But in Spark 2.0, I failed both 2.0.0-preview
> tag and 2.0.0-SNAPSHOT. Anyone has any idea? Thanks!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Build-Spark-2-0-succeeded-but-could-not-run-it-on-YARN-tp27199.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Build Spark 2.0 succeeded but could not run it on YARN

2016-06-20 Thread wgtmac
I ran into problems in building Spark 2.0. The build process actually
succeeded but when I uploaded to cluster and launched the Spark shell on
YARN, it reported following exceptions again and again:

16/06/17 03:32:00 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
Container marked as failed: container_e437_1464601161543_1582846_01_13
on host: hadoopworker575-sjc1.. Exit status: 1. Diagnostics:
Exception from container-launch.
Container id: container_e437_1464601161543_1582846_01_13
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1

=
Build command: 

export JAVA_HOME=   // tried both java7 and java8
./dev/change-scala-version.sh 2.11   // tried both 2.10 and 2.11 
./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
-Phive-thriftserver -DskipTests clean package

The 2.0.0-preview version downloaded from Spark website works well so it is
not the problem of my cluster. Also I can make it to build Spark 1.5 and 1.6
and run them on the cluster. But in Spark 2.0, I failed both 2.0.0-preview
tag and 2.0.0-SNAPSHOT. Anyone has any idea? Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Build-Spark-2-0-succeeded-but-could-not-run-it-on-YARN-tp27199.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Verifying if DStream is empty

2016-06-20 Thread nguyen duc tuan
You have to create input1Pair inside foreachRDD function. As your original
code, one solution will look like as following:

val input2Pair = input2.map(x => (x._1, x))

input2Pair.cache()

streamData.foreachRDD{rdd =>
 if(!rdd.isEmpty()){
   val input1Pair = rdd.map(x => (x._1, x))
   val joinData = input1Pair.leftOuterJoin(input2Pair)
   val result = joinData.mapValues{
case(v, Some(a)) => 1L
case(v, None) => 0

 ​  ​
}.reduceByKey(_ + _).filter(_._2 > 1)

​   //process result dstream

 }
}

2016-06-21 0:03 GMT+07:00 Praseetha :

> Thanks a lot for the response.
> input1Pair is a DStream. I tried with the code snippet below,
>
> result.foreachRDD{externalRDD =>
>if(!externalRDD.isEmpty()){
>  val ss = input1Pair.transform{ rdd =>
> input2Pair.leftOuterJoin(rdd)}
>}else{
>  val ss = input1Pair.transform{ rdd =>
> input2Pair.leftOuterJoin(rdd)}
>}
>  }
>
> I'm getting the following exception:
> *java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after starting a context is not supported*
> * at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)*
> * at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)*
> * at
> org.apache.spark.streaming.dstream.TransformedDStream.(TransformedDStream.scala:25)*
> * at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:670)*
> * at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:661)*
>
> I don't think we can perform transformation on RDDs,that are outside for
> foreachRDD.
> My requirement is to figure out if the DStream 'result' is empty or not
> and based on the result, perform some operation on input1Pair DStream and
> input2Pair RDD.
>
>
> On Mon, Jun 20, 2016 at 7:05 PM, nguyen duc tuan 
> wrote:
>
>> Hi Praseetha,
>> In order to check if DStream is empty or not, using isEmpty method is
>> correct. I think the problem here is calling  
>> input1Pair.lefOuterJoin(input2Pair).
>> I guess input1Pair rdd comes from above transformation. You should do it
>> on DStream instead. In this case, do any transformation with x variable
>> instead.
>> If you use input2Pair rdd a lot, you can consider caching it for better
>> performance.
>>
>> 2016-06-20 19:30 GMT+07:00 Praseetha :
>>
>>>
>>> Hi Experts,
>>>
>>> I have 2 inputs, where first input is stream (say input1) and the second
>>> one is batch (say input2). I want to figure out if the keys in first input
>>> matches single row or more than one row in the second input. The further
>>> transformations/logic depends on the number of rows matching, whether
>>> single row matches or multiple rows match (for atleast one key in the first
>>> input)
>>>
>>> if(single row matches){
>>>  // do some tranformation
>>> }else{
>>>  // do some transformation
>>> }
>>>
>>> Code that i tried so far
>>>
>>> val input1Pair = streamData.map(x => (x._1, x))
>>> val input2Pair = input2.map(x => (x._1, x))
>>> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
>>> val result = joinData.mapValues{
>>> case(v, Some(a)) => 1L
>>> case(v, None) => 0
>>>  }.reduceByKey(_ + _).filter(_._2 > 1)
>>>
>>> I have done the above coding. When I do result.print, it prints nothing
>>> if all the keys matches only one row in the input2. With the fact that the
>>> DStream may have multiple RDDs, not sure how to figure out if the DStream
>>> is empty or not.
>>>
>>> I tried using foreachRDD, but the streaming app stops abruptly.
>>>
>>> Inside foreachRDD i was performing transformations with other RDDs. like,
>>>
>>> result.foreachRDD{ x=>
>>>
>>> if(x.isEmpty){
>>>
>>> val out = input1Pair.lefOuterJoin(input2Pair)
>>>
>>> }else{
>>>
>>> val out = input1Pair.rightOuterJoin(input2Pair)
>>>
>>> }
>>>
>>> Can you please suggest.
>>>
>>>
>>> Regds,
>>> --Praseetha
>>>
>>
>>
>


javax.net.ssl.SSLHandshakeException: unable to find valid certification path to requested target

2016-06-20 Thread Utkarsh Sengar
We are intermittently getting this error when spark tried to load data from
S3:Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target.

https://gist.githubusercontent.com/utkarsh2012/1c4cd2dc82c20c6f389b783927371bd7/raw/a1be6617d23b1744631427fe90aaa1cce4313f36/stacktrace

Running spark 1.5.1 on java8 and mesos-0.23.0
jets3t is 0.9.4

What can be the possible issue here - network, mesos or s3? It was working
fine earlier.

-- 
Thanks,
-Utkarsh


Re: SparkSQL issue: Spark 1.3.1 + hadoop 2.6 on CDH5.3 with parquet

2016-06-20 Thread Satya
Hello,
We are also experiencing the same error.  Can you please provide the steps
that resolved the issue.
Thanks
Satya



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-issue-Spark-1-3-1-hadoop-2-6-on-CDH5-3-with-parquet-tp22808p27197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 2.0 on YARN - Files in config archive not ending up on executor classpath

2016-06-20 Thread Jonathan Kelly
OK, JIRA created: https://issues.apache.org/jira/browse/SPARK-16080

Also, after looking at the code a bit I think I see the reason. If I'm
correct, it may actually be a very easy fix.

On Mon, Jun 20, 2016 at 1:21 PM Marcelo Vanzin  wrote:

> It doesn't hurt to have a bug tracking it, in case anyone else has
> time to look at it before I do.
>
> On Mon, Jun 20, 2016 at 1:20 PM, Jonathan Kelly 
> wrote:
> > Thanks for the confirmation! Shall I cut a JIRA issue?
> >
> > On Mon, Jun 20, 2016 at 10:42 AM Marcelo Vanzin 
> wrote:
> >>
> >> I just tried this locally and can see the wrong behavior you mention.
> >> I'm running a somewhat old build of 2.0, but I'll take a look.
> >>
> >> On Mon, Jun 20, 2016 at 7:04 AM, Jonathan Kelly  >
> >> wrote:
> >> > Does anybody have any thoughts on this?
> >> >
> >> > On Fri, Jun 17, 2016 at 6:36 PM Jonathan Kelly <
> jonathaka...@gmail.com>
> >> > wrote:
> >> >>
> >> >> I'm trying to debug a problem in Spark 2.0.0-SNAPSHOT (commit
> >> >> bdf5fe4143e5a1a393d97d0030e76d35791ee248) where Spark's
> >> >> log4j.properties is
> >> >> not getting picked up in the executor classpath (and driver classpath
> >> >> for
> >> >> yarn-cluster mode), so Hadoop's log4j.properties file is taking
> >> >> precedence
> >> >> in the YARN containers.
> >> >>
> >> >> Spark's log4j.properties file is correctly being bundled into the
> >> >> __spark_conf__.zip file and getting added to the DistributedCache,
> but
> >> >> it is
> >> >> not in the classpath of the executor, as evidenced by the following
> >> >> command,
> >> >> which I ran in spark-shell:
> >> >>
> >> >> scala> sc.parallelize(Seq(1)).map(_ =>
> >> >> getClass().getResource("/log4j.properties")).first
> >> >> res3: java.net.URL = file:/etc/hadoop/conf.empty/log4j.properties
> >> >>
> >> >> I then ran the following in spark-shell to verify the classpath of
> the
> >> >> executors:
> >> >>
> >> >> scala> sc.parallelize(Seq(1)).map(_ =>
> >> >> System.getProperty("java.class.path")).flatMap(_.split(':')).filter(e
> >> >> =>
> >> >> !e.endsWith(".jar") && !e.endsWith("*")).collect.foreach(println)
> >> >> ...
> >> >>
> >> >>
> >> >>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
> >> >>
> >> >>
> >> >>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03/__spark_conf__
> >> >> /etc/hadoop/conf
> >> >> ...
> >> >>
> >> >> So the JVM has this nonexistent __spark_conf__ directory in the
> >> >> classpath
> >> >> when it should really be __spark_conf__.zip (which is actually a
> >> >> symlink to
> >> >> a directory, despite the .zip filename).
> >> >>
> >> >> % sudo ls -l
> >> >>
> >> >>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
> >> >> total 20
> >> >> -rw-r--r-- 1 yarn yarn   88 Jun 18 01:26 container_tokens
> >> >> -rwx-- 1 yarn yarn  594 Jun 18 01:26
> >> >> default_container_executor_session.sh
> >> >> -rwx-- 1 yarn yarn  648 Jun 18 01:26
> default_container_executor.sh
> >> >> -rwx-- 1 yarn yarn 4419 Jun 18 01:26 launch_container.sh
> >> >> lrwxrwxrwx 1 yarn yarn   59 Jun 18 01:26 __spark_conf__.zip ->
> >> >> /mnt1/yarn/usercache/hadoop/filecache/17/__spark_conf__.zip
> >> >> lrwxrwxrwx 1 yarn yarn   77 Jun 18 01:26 __spark_libs__ ->
> >> >>
> >> >>
> /mnt/yarn/usercache/hadoop/filecache/16/__spark_libs__4490748779530764463.zip
> >> >> drwx--x--- 2 yarn yarn   46 Jun 18 01:26 tmp
> >> >>
> >> >> Does anybody know why this is happening? Is this a bug in Spark, or
> is
> >> >> it
> >> >> the JVM doing this (possibly because the extension is .zip)?
> >> >>
> >> >> Thanks,
> >> >> Jonathan
> >>
> >>
> >>
> >> --
> >> Marcelo
>
>
>
> --
> Marcelo
>


Saving data using tempTable versus save() method

2016-06-20 Thread Mich Talebzadeh
Hi,

I have a DF based on a table and sorted and shown below

This is fine and when I register as tempTable I can populate the underlying
table sales 2 in Hive. That sales2 is an ORC table

 val s = HiveContext.table("sales_staging")
  val sorted = s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
  sorted.registerTempTable("tmp")
  sqltext = """
  INSERT INTO TABLE oraclehadoop.sales2
  SELECT
  PROD_ID
, CUST_ID
, TIME_ID
, CHANNEL_ID
, PROMO_ID
, QUANTITY_SOLD
, AMOUNT_SOLD
  FROM tmp
  """
  HiveContext.sql(sqltext)
  HiveContext.sql("select count(1) from oraclehadoop.sales2").show
  HiveContext.sql("truncate table oraclehadoop.sales2")

  sorted.save("oraclehadoop.sales2")
  HiveContext.sql("select count(1) from oraclehadoop.sales2").show

When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")

It does not save any data

Started at
[20/06/2016 21:21:57.57]
+--+
|   _c0|


*+--+|918843|// This works+--+*
[Stage 7:>  (3 + 1)
/ 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
+---+
|_c0|


*+---+|  0|  // This does not+---+*
Finished at
[20/06/2016 21:22:30.30]

Any ideas if anyone has seen this before?


The issue is saving data. Saving through tempTable works but the other one
does not work.


Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Spark 2.0 on YARN - Files in config archive not ending up on executor classpath

2016-06-20 Thread Marcelo Vanzin
It doesn't hurt to have a bug tracking it, in case anyone else has
time to look at it before I do.

On Mon, Jun 20, 2016 at 1:20 PM, Jonathan Kelly  wrote:
> Thanks for the confirmation! Shall I cut a JIRA issue?
>
> On Mon, Jun 20, 2016 at 10:42 AM Marcelo Vanzin  wrote:
>>
>> I just tried this locally and can see the wrong behavior you mention.
>> I'm running a somewhat old build of 2.0, but I'll take a look.
>>
>> On Mon, Jun 20, 2016 at 7:04 AM, Jonathan Kelly 
>> wrote:
>> > Does anybody have any thoughts on this?
>> >
>> > On Fri, Jun 17, 2016 at 6:36 PM Jonathan Kelly 
>> > wrote:
>> >>
>> >> I'm trying to debug a problem in Spark 2.0.0-SNAPSHOT (commit
>> >> bdf5fe4143e5a1a393d97d0030e76d35791ee248) where Spark's
>> >> log4j.properties is
>> >> not getting picked up in the executor classpath (and driver classpath
>> >> for
>> >> yarn-cluster mode), so Hadoop's log4j.properties file is taking
>> >> precedence
>> >> in the YARN containers.
>> >>
>> >> Spark's log4j.properties file is correctly being bundled into the
>> >> __spark_conf__.zip file and getting added to the DistributedCache, but
>> >> it is
>> >> not in the classpath of the executor, as evidenced by the following
>> >> command,
>> >> which I ran in spark-shell:
>> >>
>> >> scala> sc.parallelize(Seq(1)).map(_ =>
>> >> getClass().getResource("/log4j.properties")).first
>> >> res3: java.net.URL = file:/etc/hadoop/conf.empty/log4j.properties
>> >>
>> >> I then ran the following in spark-shell to verify the classpath of the
>> >> executors:
>> >>
>> >> scala> sc.parallelize(Seq(1)).map(_ =>
>> >> System.getProperty("java.class.path")).flatMap(_.split(':')).filter(e
>> >> =>
>> >> !e.endsWith(".jar") && !e.endsWith("*")).collect.foreach(println)
>> >> ...
>> >>
>> >>
>> >> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
>> >>
>> >>
>> >> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03/__spark_conf__
>> >> /etc/hadoop/conf
>> >> ...
>> >>
>> >> So the JVM has this nonexistent __spark_conf__ directory in the
>> >> classpath
>> >> when it should really be __spark_conf__.zip (which is actually a
>> >> symlink to
>> >> a directory, despite the .zip filename).
>> >>
>> >> % sudo ls -l
>> >>
>> >> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
>> >> total 20
>> >> -rw-r--r-- 1 yarn yarn   88 Jun 18 01:26 container_tokens
>> >> -rwx-- 1 yarn yarn  594 Jun 18 01:26
>> >> default_container_executor_session.sh
>> >> -rwx-- 1 yarn yarn  648 Jun 18 01:26 default_container_executor.sh
>> >> -rwx-- 1 yarn yarn 4419 Jun 18 01:26 launch_container.sh
>> >> lrwxrwxrwx 1 yarn yarn   59 Jun 18 01:26 __spark_conf__.zip ->
>> >> /mnt1/yarn/usercache/hadoop/filecache/17/__spark_conf__.zip
>> >> lrwxrwxrwx 1 yarn yarn   77 Jun 18 01:26 __spark_libs__ ->
>> >>
>> >> /mnt/yarn/usercache/hadoop/filecache/16/__spark_libs__4490748779530764463.zip
>> >> drwx--x--- 2 yarn yarn   46 Jun 18 01:26 tmp
>> >>
>> >> Does anybody know why this is happening? Is this a bug in Spark, or is
>> >> it
>> >> the JVM doing this (possibly because the extension is .zip)?
>> >>
>> >> Thanks,
>> >> Jonathan
>>
>>
>>
>> --
>> Marcelo



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 2.0 on YARN - Files in config archive not ending up on executor classpath

2016-06-20 Thread Jonathan Kelly
Thanks for the confirmation! Shall I cut a JIRA issue?

On Mon, Jun 20, 2016 at 10:42 AM Marcelo Vanzin  wrote:

> I just tried this locally and can see the wrong behavior you mention.
> I'm running a somewhat old build of 2.0, but I'll take a look.
>
> On Mon, Jun 20, 2016 at 7:04 AM, Jonathan Kelly 
> wrote:
> > Does anybody have any thoughts on this?
> >
> > On Fri, Jun 17, 2016 at 6:36 PM Jonathan Kelly 
> > wrote:
> >>
> >> I'm trying to debug a problem in Spark 2.0.0-SNAPSHOT (commit
> >> bdf5fe4143e5a1a393d97d0030e76d35791ee248) where Spark's
> log4j.properties is
> >> not getting picked up in the executor classpath (and driver classpath
> for
> >> yarn-cluster mode), so Hadoop's log4j.properties file is taking
> precedence
> >> in the YARN containers.
> >>
> >> Spark's log4j.properties file is correctly being bundled into the
> >> __spark_conf__.zip file and getting added to the DistributedCache, but
> it is
> >> not in the classpath of the executor, as evidenced by the following
> command,
> >> which I ran in spark-shell:
> >>
> >> scala> sc.parallelize(Seq(1)).map(_ =>
> >> getClass().getResource("/log4j.properties")).first
> >> res3: java.net.URL = file:/etc/hadoop/conf.empty/log4j.properties
> >>
> >> I then ran the following in spark-shell to verify the classpath of the
> >> executors:
> >>
> >> scala> sc.parallelize(Seq(1)).map(_ =>
> >> System.getProperty("java.class.path")).flatMap(_.split(':')).filter(e =>
> >> !e.endsWith(".jar") && !e.endsWith("*")).collect.foreach(println)
> >> ...
> >>
> >>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
> >>
> >>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03/__spark_conf__
> >> /etc/hadoop/conf
> >> ...
> >>
> >> So the JVM has this nonexistent __spark_conf__ directory in the
> classpath
> >> when it should really be __spark_conf__.zip (which is actually a
> symlink to
> >> a directory, despite the .zip filename).
> >>
> >> % sudo ls -l
> >>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
> >> total 20
> >> -rw-r--r-- 1 yarn yarn   88 Jun 18 01:26 container_tokens
> >> -rwx-- 1 yarn yarn  594 Jun 18 01:26
> >> default_container_executor_session.sh
> >> -rwx-- 1 yarn yarn  648 Jun 18 01:26 default_container_executor.sh
> >> -rwx-- 1 yarn yarn 4419 Jun 18 01:26 launch_container.sh
> >> lrwxrwxrwx 1 yarn yarn   59 Jun 18 01:26 __spark_conf__.zip ->
> >> /mnt1/yarn/usercache/hadoop/filecache/17/__spark_conf__.zip
> >> lrwxrwxrwx 1 yarn yarn   77 Jun 18 01:26 __spark_libs__ ->
> >>
> /mnt/yarn/usercache/hadoop/filecache/16/__spark_libs__4490748779530764463.zip
> >> drwx--x--- 2 yarn yarn   46 Jun 18 01:26 tmp
> >>
> >> Does anybody know why this is happening? Is this a bug in Spark, or is
> it
> >> the JVM doing this (possibly because the extension is .zip)?
> >>
> >> Thanks,
> >> Jonathan
>
>
>
> --
> Marcelo
>


Re: Improving performance of a kafka spark streaming app

2016-06-20 Thread Colin Kincaid Williams
I'll try dropping the maxRatePerPartition=400, or maybe even lower.
However even at application starts up I have this large scheduling
delay. I will report my progress later on.

On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger  wrote:
> If your batch time is 1 second and your average processing time is
> 1.16 seconds, you're always going to be falling behind.  That would
> explain why you've built up an hour of scheduling delay after eight
> hours of running.
>
> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams  
> wrote:
>> Hi Mich again,
>>
>> Regarding batch window, etc. I have provided the sources, but I'm not
>> currently calling the window function. Did you see the program source?
>> It's only 100 lines.
>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> Then I would expect I'm using defaults, other than what has been shown
>> in the configuration.
>>
>> For example:
>>
>> In the launcher configuration I set --conf
>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>> are 500 messages for the duration set in the application:
>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>> Duration(1000));
>>
>>
>> Then with the --num-executors 6 \ submit flag, and the
>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>> arrive at the 3000 events per batch in the UI, pasted above.
>>
>> Feel free to correct me if I'm wrong.
>>
>> Then are you suggesting that I set the window?
>>
>> Maybe following this as reference:
>>
>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>
>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>  wrote:
>>> Ok
>>>
>>> What is the set up for these please?
>>>
>>> batch window
>>> window length
>>> sliding interval
>>>
>>> And also in each batch window how much data do you get in (no of messages in
>>> the topic whatever)?
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>>
>>> On 18 June 2016 at 21:01, Mich Talebzadeh  wrote:

 I believe you have an issue with performance?

 have you checked spark GUI (default 4040) for details including shuffles
 etc?

 HTH

 Dr Mich Talebzadeh



 LinkedIn
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



 http://talebzadehmich.wordpress.com




 On 18 June 2016 at 20:59, Colin Kincaid Williams  wrote:
>
> There are 25 nodes in the spark cluster.
>
> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>  wrote:
> > how many nodes are in your cluster?
> >
> > --num-executors 6 \
> >  --driver-memory 4G \
> >  --executor-memory 2G \
> >  --total-executor-cores 12 \
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 18 June 2016 at 20:40, Colin Kincaid Williams 
> > wrote:
> >>
> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
> >> Kafka using the direct api and inserts content into an hbase cluster,
> >> as described in this thread. I was away from this project for awhile
> >> due to events in my family.
> >>
> >> Currently my scheduling delay is high, but the processing time is
> >> stable around a second. I changed my setup to use 6 kafka partitions
> >> on a set of smaller kafka brokers, with fewer disks. I've included
> >> some details below, including the script I use to launch the
> >> application. I'm using a Spark on Hbase library, whose version is
> >> relevant to my Hbase cluster. Is it apparent there is something wrong
> >> with my launch method that could be causing the delay, related to the
> >> included jars?
> >>
> >> Or is there something wrong with the very simple approach I'm taking
> >> for the application?
> >>
> >> Any advice is appriciated.
> >>
> >>
> >> The application:
> >>
> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >>
> >>
> >> From the streaming UI I get something like:
> >>
> >> table Completed Batches (last 1000 out of 27136)
> >>
> >>
> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
> >> Delay (?) Output Ops: Succeeded/Total
> >>
> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 

Re: Spark 2.0 on YARN - Files in config archive not ending up on executor classpath

2016-06-20 Thread Marcelo Vanzin
I just tried this locally and can see the wrong behavior you mention.
I'm running a somewhat old build of 2.0, but I'll take a look.

On Mon, Jun 20, 2016 at 7:04 AM, Jonathan Kelly  wrote:
> Does anybody have any thoughts on this?
>
> On Fri, Jun 17, 2016 at 6:36 PM Jonathan Kelly 
> wrote:
>>
>> I'm trying to debug a problem in Spark 2.0.0-SNAPSHOT (commit
>> bdf5fe4143e5a1a393d97d0030e76d35791ee248) where Spark's log4j.properties is
>> not getting picked up in the executor classpath (and driver classpath for
>> yarn-cluster mode), so Hadoop's log4j.properties file is taking precedence
>> in the YARN containers.
>>
>> Spark's log4j.properties file is correctly being bundled into the
>> __spark_conf__.zip file and getting added to the DistributedCache, but it is
>> not in the classpath of the executor, as evidenced by the following command,
>> which I ran in spark-shell:
>>
>> scala> sc.parallelize(Seq(1)).map(_ =>
>> getClass().getResource("/log4j.properties")).first
>> res3: java.net.URL = file:/etc/hadoop/conf.empty/log4j.properties
>>
>> I then ran the following in spark-shell to verify the classpath of the
>> executors:
>>
>> scala> sc.parallelize(Seq(1)).map(_ =>
>> System.getProperty("java.class.path")).flatMap(_.split(':')).filter(e =>
>> !e.endsWith(".jar") && !e.endsWith("*")).collect.foreach(println)
>> ...
>>
>> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
>>
>> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03/__spark_conf__
>> /etc/hadoop/conf
>> ...
>>
>> So the JVM has this nonexistent __spark_conf__ directory in the classpath
>> when it should really be __spark_conf__.zip (which is actually a symlink to
>> a directory, despite the .zip filename).
>>
>> % sudo ls -l
>> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
>> total 20
>> -rw-r--r-- 1 yarn yarn   88 Jun 18 01:26 container_tokens
>> -rwx-- 1 yarn yarn  594 Jun 18 01:26
>> default_container_executor_session.sh
>> -rwx-- 1 yarn yarn  648 Jun 18 01:26 default_container_executor.sh
>> -rwx-- 1 yarn yarn 4419 Jun 18 01:26 launch_container.sh
>> lrwxrwxrwx 1 yarn yarn   59 Jun 18 01:26 __spark_conf__.zip ->
>> /mnt1/yarn/usercache/hadoop/filecache/17/__spark_conf__.zip
>> lrwxrwxrwx 1 yarn yarn   77 Jun 18 01:26 __spark_libs__ ->
>> /mnt/yarn/usercache/hadoop/filecache/16/__spark_libs__4490748779530764463.zip
>> drwx--x--- 2 yarn yarn   46 Jun 18 01:26 tmp
>>
>> Does anybody know why this is happening? Is this a bug in Spark, or is it
>> the JVM doing this (possibly because the extension is .zip)?
>>
>> Thanks,
>> Jonathan



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Verifying if DStream is empty

2016-06-20 Thread Praseetha
Thanks a lot for the response.
input1Pair is a DStream. I tried with the code snippet below,

result.foreachRDD{externalRDD =>
   if(!externalRDD.isEmpty()){
 val ss = input1Pair.transform{ rdd =>
input2Pair.leftOuterJoin(rdd)}
   }else{
 val ss = input1Pair.transform{ rdd =>
input2Pair.leftOuterJoin(rdd)}
   }
 }

I'm getting the following exception:
*java.lang.IllegalStateException: Adding new inputs, transformations, and
output operations after starting a context is not supported*
* at
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)*
* at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)*
* at
org.apache.spark.streaming.dstream.TransformedDStream.(TransformedDStream.scala:25)*
* at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:670)*
* at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2.apply(DStream.scala:661)*

I don't think we can perform transformation on RDDs,that are outside for
foreachRDD.
My requirement is to figure out if the DStream 'result' is empty or not and
based on the result, perform some operation on input1Pair DStream and
input2Pair RDD.


On Mon, Jun 20, 2016 at 7:05 PM, nguyen duc tuan 
wrote:

> Hi Praseetha,
> In order to check if DStream is empty or not, using isEmpty method is
> correct. I think the problem here is calling  
> input1Pair.lefOuterJoin(input2Pair).
> I guess input1Pair rdd comes from above transformation. You should do it
> on DStream instead. In this case, do any transformation with x variable
> instead.
> If you use input2Pair rdd a lot, you can consider caching it for better
> performance.
>
> 2016-06-20 19:30 GMT+07:00 Praseetha :
>
>>
>> Hi Experts,
>>
>> I have 2 inputs, where first input is stream (say input1) and the second
>> one is batch (say input2). I want to figure out if the keys in first input
>> matches single row or more than one row in the second input. The further
>> transformations/logic depends on the number of rows matching, whether
>> single row matches or multiple rows match (for atleast one key in the first
>> input)
>>
>> if(single row matches){
>>  // do some tranformation
>> }else{
>>  // do some transformation
>> }
>>
>> Code that i tried so far
>>
>> val input1Pair = streamData.map(x => (x._1, x))
>> val input2Pair = input2.map(x => (x._1, x))
>> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
>> val result = joinData.mapValues{
>> case(v, Some(a)) => 1L
>> case(v, None) => 0
>>  }.reduceByKey(_ + _).filter(_._2 > 1)
>>
>> I have done the above coding. When I do result.print, it prints nothing
>> if all the keys matches only one row in the input2. With the fact that the
>> DStream may have multiple RDDs, not sure how to figure out if the DStream
>> is empty or not.
>>
>> I tried using foreachRDD, but the streaming app stops abruptly.
>>
>> Inside foreachRDD i was performing transformations with other RDDs. like,
>>
>> result.foreachRDD{ x=>
>>
>> if(x.isEmpty){
>>
>> val out = input1Pair.lefOuterJoin(input2Pair)
>>
>> }else{
>>
>> val out = input1Pair.rightOuterJoin(input2Pair)
>>
>> }
>>
>> Can you please suggest.
>>
>>
>> Regds,
>> --Praseetha
>>
>
>


Data Generators mllib -> ml

2016-06-20 Thread Stephen Boesch
There are around twenty data generators in mllib -none of which are
presently migrated to ml.

Here is an example

/**
 * :: DeveloperApi ::
 * Generate sample data used for SVM. This class generates uniform random values
 * for the features and adds Gaussian noise with weight 0.1 to generate labels.
 */
@DeveloperApi
@Since("0.8.0")
object SVMDataGenerator {


Will these be migrated - and is there any indication on a timeframe?
My intention would be to publicize these as non-deprecated for the 2.0
and 2.1 timeframes?


Re: Update Batch DF with Streaming

2016-06-20 Thread Jacek Laskowski
Hi,

How would you do that without/outside streaming?

Jacek
On 17 Jun 2016 12:12 a.m., "Amit Assudani"  wrote:

> Hi All,
>
>
> Can I update batch data frames loaded in memory with Streaming data,
>
>
> For eg,
>
>
> I have employee DF is registered as temporary table, it has EmployeeID,
> Name, Address, etc. fields,  and assuming it is very big and takes time to
> load in memory,
>
>
> I've two types of employee events (both having empID bundled in
> payload) coming in streams,
>
>
> 1) which looks up  for a particular empID in batch data and does some
> calculation and persist the results,
>
> 2) which has updated values of some of the fields for an empID,
>
>
> Now I want to keep the employee DF up to date with the updates coming in
> type 2 events for future type 1 events to use,
>
>
> Now the question is can I update the employee DF with type 2 events in
> memory ? Do I need the whole DF refresh ?
>
>
> p.s. I can join the stream with batch and get the joined table, but i am
> not sure how to get and use the handle of joined data for subsequent
> events,
>
>
> Regards,
>
> Amit
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Underutilized Cluster

2016-06-20 Thread Chadha Pooja

Hi,

I am using Amazon EMR for Spark and we have a 12 node cluster with 1 master and 
11 core nodes.

However my cluster isn’t scaling up to capacity.

Would you have suggestions to alter Spark Cluster Settings to utilize my 
cluster to its maximum capacity?

Resource Manager:
[cid:image001.png@01D1CAE0.7B79A5F0]

Thanks!
Pooja

__
The Boston Consulting Group, Inc.
 
This e-mail message may contain confidential and/or privileged information.
If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.


RE: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Yong Zhang
If  you are using Spark > 1.5, the best way is to use DataFrame API directly, 
instead of SQL. In dataframe, you can specify the boardcast join hint in the 
dataframe API, which will force the boardcast join.
Yong

From: mich.talebza...@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin@gmail.com
CC: zhen...@dtdream.com; user@spark.apache.org

what sort of the tables are these?
Can you register the result set as temp table and do a join on that assuming 
the RS is going to be small
s.filter(($"c2" < 1000)).registerTempTable("tmp")
and then do a join between tmp and Table2
HTH


Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 20 June 2016 at 12:38, Takeshi Yamamuro  wrote:
Seems it is hard to predict the output size of filters because the current 
spark has limited statistics of input data. A few hours ago, Reynold created a 
ticket for cost-based optimizer framework in 
https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, 
and suggestions, feel free to join the discussion.
// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247  wrote:


Thanks for your reply, In fact, that is what i just did
But my question is: Can we change the spark join behavior more clever, to turn 
a sortmergejoin into broadcasthashjoin automatically when if "found" that a 
output RDD is small enough?

--发件人:Takeshi 
Yamamuro 发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 
抄 送:user 主 题:Re: Is it possible to 
turn a SortMergeJoin into BroadcastHashJoin?
Hi,
How about caching the result of `select * from a where a.c2 < 1000`, then 
joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` 
to enable broadcast joins for the result table.
// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )






-- 
---
Takeshi Yamamuro



-- 
---
Takeshi Yamamuro



  

Re: Improving performance of a kafka spark streaming app

2016-06-20 Thread Cody Koeninger
If your batch time is 1 second and your average processing time is
1.16 seconds, you're always going to be falling behind.  That would
explain why you've built up an hour of scheduling delay after eight
hours of running.

On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams  wrote:
> Hi Mich again,
>
> Regarding batch window, etc. I have provided the sources, but I'm not
> currently calling the window function. Did you see the program source?
> It's only 100 lines.
>
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>
> Then I would expect I'm using defaults, other than what has been shown
> in the configuration.
>
> For example:
>
> In the launcher configuration I set --conf
> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
> are 500 messages for the duration set in the application:
> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
> Duration(1000));
>
>
> Then with the --num-executors 6 \ submit flag, and the
> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
> arrive at the 3000 events per batch in the UI, pasted above.
>
> Feel free to correct me if I'm wrong.
>
> Then are you suggesting that I set the window?
>
> Maybe following this as reference:
>
> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>
> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>  wrote:
>> Ok
>>
>> What is the set up for these please?
>>
>> batch window
>> window length
>> sliding interval
>>
>> And also in each batch window how much data do you get in (no of messages in
>> the topic whatever)?
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 18 June 2016 at 21:01, Mich Talebzadeh  wrote:
>>>
>>> I believe you have an issue with performance?
>>>
>>> have you checked spark GUI (default 4040) for details including shuffles
>>> etc?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>>
>>> On 18 June 2016 at 20:59, Colin Kincaid Williams  wrote:

 There are 25 nodes in the spark cluster.

 On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
  wrote:
 > how many nodes are in your cluster?
 >
 > --num-executors 6 \
 >  --driver-memory 4G \
 >  --executor-memory 2G \
 >  --total-executor-cores 12 \
 >
 >
 > Dr Mich Talebzadeh
 >
 >
 >
 > LinkedIn
 >
 > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 >
 >
 >
 > http://talebzadehmich.wordpress.com
 >
 >
 >
 >
 > On 18 June 2016 at 20:40, Colin Kincaid Williams 
 > wrote:
 >>
 >> I updated my app to Spark 1.5.2 streaming so that it consumes from
 >> Kafka using the direct api and inserts content into an hbase cluster,
 >> as described in this thread. I was away from this project for awhile
 >> due to events in my family.
 >>
 >> Currently my scheduling delay is high, but the processing time is
 >> stable around a second. I changed my setup to use 6 kafka partitions
 >> on a set of smaller kafka brokers, with fewer disks. I've included
 >> some details below, including the script I use to launch the
 >> application. I'm using a Spark on Hbase library, whose version is
 >> relevant to my Hbase cluster. Is it apparent there is something wrong
 >> with my launch method that could be causing the delay, related to the
 >> included jars?
 >>
 >> Or is there something wrong with the very simple approach I'm taking
 >> for the application?
 >>
 >> Any advice is appriciated.
 >>
 >>
 >> The application:
 >>
 >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
 >>
 >>
 >> From the streaming UI I get something like:
 >>
 >> table Completed Batches (last 1000 out of 27136)
 >>
 >>
 >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
 >> Delay (?) Output Ops: Succeeded/Total
 >>
 >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
 >>
 >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
 >>
 >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
 >>
 >>
 >> Here's how I'm launching the spark application.
 >>
 >>
 >> #!/usr/bin/env bash
 >>
 >> export SPARK_CONF_DIR=/home/colin.williams/spark
 >>
 >> export HADOOP_CONF_DIR=/etc/hadoop/conf
 >>
 >> export
 >>
 >> 

Re: Spark 2.0 on YARN - Files in config archive not ending up on executor classpath

2016-06-20 Thread Jonathan Kelly
Does anybody have any thoughts on this?
On Fri, Jun 17, 2016 at 6:36 PM Jonathan Kelly 
wrote:

> I'm trying to debug a problem in Spark 2.0.0-SNAPSHOT
> (commit bdf5fe4143e5a1a393d97d0030e76d35791ee248) where Spark's
> log4j.properties is not getting picked up in the executor classpath (and
> driver classpath for yarn-cluster mode), so Hadoop's log4j.properties file
> is taking precedence in the YARN containers.
>
> Spark's log4j.properties file is correctly being bundled into the
> __spark_conf__.zip file and getting added to the DistributedCache, but it
> is not in the classpath of the executor, as evidenced by the following
> command, which I ran in spark-shell:
>
> scala> sc.parallelize(Seq(1)).map(_ =>
> getClass().getResource("/log4j.properties")).first
> res3: java.net.URL = file:/etc/hadoop/conf.empty/log4j.properties
>
> I then ran the following in spark-shell to verify the classpath of the
> executors:
>
> scala> sc.parallelize(Seq(1)).map(_ =>
> System.getProperty("java.class.path")).flatMap(_.split(':')).filter(e =>
> !e.endsWith(".jar") && !e.endsWith("*")).collect.foreach(println)
> ...
>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
>
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03/__spark_conf__
> /etc/hadoop/conf
> ...
>
> So the JVM has this nonexistent __spark_conf__ directory in the classpath
> when it should really be __spark_conf__.zip (which is actually a symlink
> to a directory, despite the .zip filename).
>
> % sudo ls -l
> /mnt/yarn/usercache/hadoop/appcache/application_1466208403287_0003/container_1466208403287_0003_01_03
> total 20
> -rw-r--r-- 1 yarn yarn   88 Jun 18 01:26 container_tokens
> -rwx-- 1 yarn yarn  594 Jun 18 01:26
> default_container_executor_session.sh
> -rwx-- 1 yarn yarn  648 Jun 18 01:26 default_container_executor.sh
> -rwx-- 1 yarn yarn 4419 Jun 18 01:26 launch_container.sh
> lrwxrwxrwx 1 yarn yarn   59 Jun 18 01:26 __spark_conf__.zip ->
> /mnt1/yarn/usercache/hadoop/filecache/17/__spark_conf__.zip
> lrwxrwxrwx 1 yarn yarn   77 Jun 18 01:26 __spark_libs__ ->
> /mnt/yarn/usercache/hadoop/filecache/16/__spark_libs__4490748779530764463.zip
> drwx--x--- 2 yarn yarn   46 Jun 18 01:26 tmp
>
> Does anybody know why this is happening? Is this a bug in Spark, or is it
> the JVM doing this (possibly because the extension is .zip)?
>
> Thanks,
> Jonathan
>


dense_rank skips ranks on cube

2016-06-20 Thread talgr
I have a dataframe with 7 dimensions,
I built a cube on them

val cube = df.cube('d1,'d2,'d3,'d4,'d5,'d6,'d7)
val cc = cube.agg(sum('p1).as("p1"),sum('p2).as("p2")).cache

and then defined a rank function on a window:

 val rankSpec =
Window.partitionBy('d1,'d2,'d3,'d4,'d5,'d6).orderBy('p1.desc)
 val grank = dense_rank().over(rankSpec)
 val cubed = cc.withColumn("rank",grank)

when I do: 
cubed.filter('d1.isNull && 'd2.isNull && 'd3.isNull && 'd4.isNull &&
'd5.isNull && 'd6.isNull && 'd7.isNotNull).sort('rank).show

i see that the first ranks are 3,5,9,10,11,12,13,15...

it seems that they becomes more dense on higher ranks.
Any idea?

Thanks
Tal



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/dense-rank-skips-ranks-on-cube-tp27196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Verifying if DStream is empty

2016-06-20 Thread nguyen duc tuan
Hi Praseetha,
In order to check if DStream is empty or not, using isEmpty method is
correct. I think the problem here is calling
input1Pair.lefOuterJoin(input2Pair).
I guess input1Pair rdd comes from above transformation. You should do it on
DStream instead. In this case, do any transformation with x variable
instead.
If you use input2Pair rdd a lot, you can consider caching it for better
performance.

2016-06-20 19:30 GMT+07:00 Praseetha :

>
> Hi Experts,
>
> I have 2 inputs, where first input is stream (say input1) and the second
> one is batch (say input2). I want to figure out if the keys in first input
> matches single row or more than one row in the second input. The further
> transformations/logic depends on the number of rows matching, whether
> single row matches or multiple rows match (for atleast one key in the first
> input)
>
> if(single row matches){
>  // do some tranformation
> }else{
>  // do some transformation
> }
>
> Code that i tried so far
>
> val input1Pair = streamData.map(x => (x._1, x))
> val input2Pair = input2.map(x => (x._1, x))
> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
> val result = joinData.mapValues{
> case(v, Some(a)) => 1L
> case(v, None) => 0
>  }.reduceByKey(_ + _).filter(_._2 > 1)
>
> I have done the above coding. When I do result.print, it prints nothing if
> all the keys matches only one row in the input2. With the fact that the
> DStream may have multiple RDDs, not sure how to figure out if the DStream
> is empty or not.
>
> I tried using foreachRDD, but the streaming app stops abruptly.
>
> Inside foreachRDD i was performing transformations with other RDDs. like,
>
> result.foreachRDD{ x=>
>
> if(x.isEmpty){
>
> val out = input1Pair.lefOuterJoin(input2Pair)
>
> }else{
>
> val out = input1Pair.rightOuterJoin(input2Pair)
>
> }
>
> Can you please suggest.
>
>
> Regds,
> --Praseetha
>


Verifying if DStream is empty

2016-06-20 Thread Praseetha
Hi Experts,

I have 2 inputs, where first input is stream (say input1) and the second
one is batch (say input2). I want to figure out if the keys in first input
matches single row or more than one row in the second input. The further
transformations/logic depends on the number of rows matching, whether
single row matches or multiple rows match (for atleast one key in the first
input)

if(single row matches){
 // do some tranformation
}else{
 // do some transformation
}

Code that i tried so far

val input1Pair = streamData.map(x => (x._1, x))
val input2Pair = input2.map(x => (x._1, x))
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
val result = joinData.mapValues{
case(v, Some(a)) => 1L
case(v, None) => 0
 }.reduceByKey(_ + _).filter(_._2 > 1)

I have done the above coding. When I do result.print, it prints nothing if
all the keys matches only one row in the input2. With the fact that the
DStream may have multiple RDDs, not sure how to figure out if the DStream
is empty or not.

I tried using foreachRDD, but the streaming app stops abruptly.

Inside foreachRDD i was performing transformations with other RDDs. like,

result.foreachRDD{ x=>

if(x.isEmpty){

val out = input1Pair.lefOuterJoin(input2Pair)

}else{

val out = input1Pair.rightOuterJoin(input2Pair)

}

Can you please suggest.


Regds,
--Praseetha


Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Mich Talebzadeh
what sort of the tables are these?

Can you register the result set as temp table and do a join on that
assuming the RS is going to be small

s.filter(($"c2" < 1000)).registerTempTable("tmp")

and then do a join between tmp and Table2

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 June 2016 at 12:38, Takeshi Yamamuro  wrote:

> Seems it is hard to predict the output size of filters because the current
> spark has limited statistics of input data. A few hours ago, Reynold
> created a ticket for cost-based optimizer framework in
> https://issues.apache.org/jira/browse/SPARK-16026.
> If you have ideas, questions, and suggestions, feel free to join the
> discussion.
>
> // maropu
>
>
> On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247  wrote:
>
>>
>>
>> Thanks for your reply, In fact, that is what i just did
>>
>> But my question is:
>> Can we change the spark join behavior more clever, to turn a
>> sortmergejoin into broadcasthashjoin automatically when if "found" that a
>> output RDD is small enough?
>>
>>
>> --
>> 发件人:Takeshi Yamamuro 
>> 发送时间:2016年6月20日(星期一) 19:16
>> 收件人:梅西0247 
>> 抄 送:user 
>> 主 题:Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
>>
>> Hi,
>>
>> How about caching the result of `select * from a where a.c2 < 1000`, then
>> joining them?
>> You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to
>> enable broadcast joins for the result table.
>>
>> // maropu
>>
>>
>> On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:
>> Hi everyone,
>>
>> I ran a SQL join statement on Spark 1.6.1 like this:
>> select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;
>> and it took quite a long time because It is a SortMergeJoin and the two
>> tables are big.
>>
>>
>> In fact,  the size of filter result(select * from a where a.c2 < 1000) is
>> very small, and I think a better solution is to use a BroadcastJoin with
>> the filter result, but  I know  the physical plan is static and it won't be
>> changed.
>>
>> So, can we make the physical plan more adaptive? (In this example, I mean
>> using a  BroadcastHashJoin instead of SortMergeJoin automatically. )
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Takeshi Yamamuro
Seems it is hard to predict the output size of filters because the current
spark has limited statistics of input data. A few hours ago, Reynold
created a ticket for cost-based optimizer framework in
https://issues.apache.org/jira/browse/SPARK-16026.
If you have ideas, questions, and suggestions, feel free to join the
discussion.

// maropu


On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247  wrote:

>
>
> Thanks for your reply, In fact, that is what i just did
>
> But my question is:
> Can we change the spark join behavior more clever, to turn a sortmergejoin
> into broadcasthashjoin automatically when if "found" that a output RDD is
> small enough?
>
>
> --
> 发件人:Takeshi Yamamuro 
> 发送时间:2016年6月20日(星期一) 19:16
> 收件人:梅西0247 
> 抄 送:user 
> 主 题:Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
>
> Hi,
>
> How about caching the result of `select * from a where a.c2 < 1000`, then
> joining them?
> You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to
> enable broadcast joins for the result table.
>
> // maropu
>
>
> On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:
> Hi everyone,
>
> I ran a SQL join statement on Spark 1.6.1 like this:
> select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;
> and it took quite a long time because It is a SortMergeJoin and the two
> tables are big.
>
>
> In fact,  the size of filter result(select * from a where a.c2 < 1000) is
> very small, and I think a better solution is to use a BroadcastJoin with
> the filter result, but  I know  the physical plan is static and it won't be
> changed.
>
> So, can we make the physical plan more adaptive? (In this example, I mean
> using a  BroadcastHashJoin instead of SortMergeJoin automatically. )
>
>
>
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>
>
>


-- 
---
Takeshi Yamamuro


Re: JDBC load into tempTable

2016-06-20 Thread Mich Talebzadeh
Try this

This is for Oracle but should work for MSSQL. If you want ordering then do
it on DF

 val d = HiveContext.load("jdbc",
  Map("url" -> _ORACLEserver,
  "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED,
to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
  "user" -> _username,
  "password" -> _password))
  *d.sort(asc("ID")).registerTempTable("tmp")*

I believe that will work.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 June 2016 at 12:10, Takeshi Yamamuro  wrote:

> Hi,
>
> Currently, no.
> spark cannot preserve the order of input data from jdbc.
> If you want to have the ordered ids, you need to sort them explicitly.
>
> // maropu
>
> On Mon, Jun 20, 2016 at 7:41 PM, Ashok Kumar  > wrote:
>
>> Hi,
>>
>> I have a SQL server table with 500,000,000 rows with primary key (unique
>> clustered index) on ID column
>>
>> If I load it through JDBC into a DataFrame and register it
>> via registerTempTable will the data will be in the order of ID in tempTable?
>>
>> Thanks
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Takeshi Yamamuro
Hi,

How about caching the result of `select * from a where a.c2 < 1000`, then
joining them?
You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to enable
broadcast joins for the result table.

// maropu


On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:

> Hi everyone,
>
> I ran a SQL join statement on Spark 1.6.1 like this:
> select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;
> and it took quite a long time because It is a SortMergeJoin and the two
> tables are big.
>
>
> In fact,  the size of filter result(select * from a where a.c2 < 1000) is
> very small, and I think a better solution is to use a BroadcastJoin with
> the filter result, but  I know  the physical plan is static and it won't be
> changed.
>
> So, can we make the physical plan more adaptive? (In this example, I mean
> using a  BroadcastHashJoin instead of SortMergeJoin automatically. )
>
>
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: JDBC load into tempTable

2016-06-20 Thread Takeshi Yamamuro
Hi,

Currently, no.
spark cannot preserve the order of input data from jdbc.
If you want to have the ordered ids, you need to sort them explicitly.

// maropu

On Mon, Jun 20, 2016 at 7:41 PM, Ashok Kumar 
wrote:

> Hi,
>
> I have a SQL server table with 500,000,000 rows with primary key (unique
> clustered index) on ID column
>
> If I load it through JDBC into a DataFrame and register it
> via registerTempTable will the data will be in the order of ID in tempTable?
>
> Thanks
>



-- 
---
Takeshi Yamamuro


Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread 梅西0247
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )






Beeline exception when connecting to Spark 2.0 ThriftServer running on yarn

2016-06-20 Thread Lei Lei2 Gu
Hi,
   I am trying Spark 2.0. I downloaded a prebuilt version 
spark-2.0.0-preview-bin-hadoop2.7.tgz for trial and installed it on my testing 
cluster. I had HDFS, YARN and Hive metastore service in position. When I 
started the thrift server, it started as expected. When I tried to connect 
thrift server through beeline, I got excetion from both the beline side and the 
thrift server side. By the way, I also tried Spark 1.6.1 and there was no 
exception with the same configuration. Can anybody help me solve this problem?
   For the beelinde side, I got the following exception:
   Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at 
org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:379)
at 
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at 
org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:156)
at 
org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:143)
at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:583)
at org.apache.hive.jdbc.HiveConnection.(HiveConnection.java:192)
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:187)
at 
org.apache.hive.beeline.DatabaseConnection.connect(DatabaseConnection.java:142)
at 
org.apache.hive.beeline.DatabaseConnection.getConnection(DatabaseConnection.java:207)
at org.apache.hive.beeline.Commands.close(Commands.java:987)
at org.apache.hive.beeline.Commands.closeall(Commands.java:969)
at org.apache.hive.beeline.BeeLine.close(BeeLine.java:826)
   For the thrift server side, I got the following exception:
   WARN netty.NettyRpcEndpointRef: Error sending message [message = 
RequestExecutors(0,0,Map())] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receiveAndReply$1$$anonfun$applyOrElse$1.apply$mcV$sp(YarnSchedulerBackend.scala:271)
at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receiveAndReply$1$$anonfun$applyOrElse$1.apply(YarnSchedulerBackend.scala:271)
at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receiveAndReply$1$$anonfun$applyOrElse$1.apply(YarnSchedulerBackend.scala:271)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
   Caused by: java.io.IOException: Failed to send RPC 8568677726416939006 
to ludp02.lenovo.com/10.100.6.16:36017: java.nio.channels.ClosedChannelException
at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at 

tensor factorization FR

2016-06-20 Thread Roberto Pagliari
There are a number of research papers about tensor factorization and its use in 
machine learning.

Is tensor factorization in the roadmap?


Unable to acquire bytes of memory

2016-06-20 Thread pseudo oduesp
Hi ,
i don t have no idea why i get this error



Py4JJavaError: An error occurred while calling o69143.parquet.
: org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
at sun.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 8 in stage 10645.0 failed 4 times, most recent failure: Lost
task 8.3 in stage 10645.0 (TID 536592,
prssnbd1s006.bigplay.bigdata.intraxa): java.io.IOException: Unable to
acquire 67108864 bytes of memory
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68)
at org.apache.spark.sql.execution.TungstenSort.org
$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at

Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Mohanraj Ragupathiraj
Thank you very much.

On Mon, Jun 20, 2016 at 3:38 PM, Jörn Franke  wrote:

> If you insert the data sorted then there is not need to bucket the data.
> You can even create an index in Spark. Simply set the outputformat
> configuration orc.create.index = true
>
>
> On 20 Jun 2016, at 09:10, Mich Talebzadeh 
> wrote:
>
> Right, you concern is that you expect storeindex in ORC file to help the
> optimizer.
>
> Frankly I do not know what
> write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under
> the bonnet. From my experience in order for ORC index to be used you need
> to bucket the table. I have explained these before in here
> 
>
> Now it is possible that you have not updated statistics on the table
>
> Even with Spark I tend to create my ORC table explicitly through Spark SQL.
>
> You stated the join scans all the underlying ORC table. Your "id" column I
> assume is unique. So I would bucket it using id column.
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 June 2016 at 07:07, Mohanraj Ragupathiraj 
> wrote:
>
>> Hi Mich,
>>
>> Thank you for your reply.
>>
>> Let me explain more clearly.
>>
>> File with 100 records needs to joined with a Big lookup File created in
>> ORC format (500 million records). The Spark process i wrote is returing
>> back the matching records and is working fine. My concern is that it loads
>> the entire file (500 million) and matches with the 100 records instead of
>> loading only the stripes with matching keys. I read that ORC file provides
>> indexes (https://orc.apache.org/docs/indexes.html) and i assumned that
>> when i join using Dataframes, the indexes will be used, resulting in
>> loading of only matching records/stripes for processing instead of the
>> whole table.
>>
>> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> To start when you store the data in ORC file can you verify that the
>>> data is there?
>>>
>>> For example register it as tempTable
>>>
>>> processDF.register("tmp")
>>> sql("select count(1) from tmp).show
>>>
>>> Also what do you mean by index file in ORC?
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj 
>>> wrote:
>>>
 I am trying to join a Dataframe(say 100 records) with an ORC file with
 500 million records through Spark(can increase to 4-5 billion, 25 bytes
 each record).

 I used Spark hiveContext API.

 *ORC File Creation Code*

 //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
 DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
 fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");

 *ORC File Reading Code*

 HiveContext hiveContext = new HiveContext(sparkContext);
 DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
 // allRecords is dataframe
 DataFrame processDf = 
 allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
 processDf.show();

 When I read the ORC file, the get following in my Spark Logs:

 Input split: 
 file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min
  key = null, max key = null*
 Reading ORC rows from 
 file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
  with {include: [true, true, true], offset: 0, length: 9223372036854775807}
 Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
 Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 
 56,PROCESS_LOCAL, 2220 bytes)
 Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
 Running task 56.0 in stage 2.0 (TID 60)

 Although the Spark job completes successfully, I think, its not able to
 utilize ORC index file capability and thus checks through entire block of
 ORC data before moving on.

 *Question*

 -- Is it a normal behaviour, or I have to set any configuration before
 saving the data in ORC format?

 -- If it is *NORMAL*, what is the best way to join so that we discrad
 non-matching records on the 

Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Mohanraj Ragupathiraj
Thank you very much.

On Mon, Jun 20, 2016 at 3:10 PM, Mich Talebzadeh 
wrote:

> Right, you concern is that you expect storeindex in ORC file to help the
> optimizer.
>
> Frankly I do not know what
> write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under
> the bonnet. From my experience in order for ORC index to be used you need
> to bucket the table. I have explained these before in here
> 
>
> Now it is possible that you have not updated statistics on the table
>
> Even with Spark I tend to create my ORC table explicitly through Spark SQL.
>
> You stated the join scans all the underlying ORC table. Your "id" column I
> assume is unique. So I would bucket it using id column.
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 June 2016 at 07:07, Mohanraj Ragupathiraj 
> wrote:
>
>> Hi Mich,
>>
>> Thank you for your reply.
>>
>> Let me explain more clearly.
>>
>> File with 100 records needs to joined with a Big lookup File created in
>> ORC format (500 million records). The Spark process i wrote is returing
>> back the matching records and is working fine. My concern is that it loads
>> the entire file (500 million) and matches with the 100 records instead of
>> loading only the stripes with matching keys. I read that ORC file provides
>> indexes (https://orc.apache.org/docs/indexes.html) and i assumned that
>> when i join using Dataframes, the indexes will be used, resulting in
>> loading of only matching records/stripes for processing instead of the
>> whole table.
>>
>> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> To start when you store the data in ORC file can you verify that the
>>> data is there?
>>>
>>> For example register it as tempTable
>>>
>>> processDF.register("tmp")
>>> sql("select count(1) from tmp).show
>>>
>>> Also what do you mean by index file in ORC?
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj 
>>> wrote:
>>>
 I am trying to join a Dataframe(say 100 records) with an ORC file with
 500 million records through Spark(can increase to 4-5 billion, 25 bytes
 each record).

 I used Spark hiveContext API.

 *ORC File Creation Code*

 //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
 DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
 fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");

 *ORC File Reading Code*

 HiveContext hiveContext = new HiveContext(sparkContext);
 DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
 // allRecords is dataframe
 DataFrame processDf = 
 allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
 processDf.show();

 When I read the ORC file, the get following in my Spark Logs:

 Input split: 
 file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min
  key = null, max key = null*
 Reading ORC rows from 
 file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
  with {include: [true, true, true], offset: 0, length: 9223372036854775807}
 Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
 Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 
 56,PROCESS_LOCAL, 2220 bytes)
 Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
 Running task 56.0 in stage 2.0 (TID 60)

 Although the Spark job completes successfully, I think, its not able to
 utilize ORC index file capability and thus checks through entire block of
 ORC data before moving on.

 *Question*

 -- Is it a normal behaviour, or I have to set any configuration before
 saving the data in ORC format?

 -- If it is *NORMAL*, what is the best way to join so that we discrad
 non-matching records on the disk level(maybe only the index file for ORC
 data is loaded)?

>>>
>>>
>>
>>
>> --
>> Thanks and Regards
>> Mohan
>> VISA Pte Limited, Singapore.
>>
>
>


-- 
Thanks and Regards
Mohan
VISA Pte Limited, Singapore.


Re: spark job automatically killed without rhyme or reason

2016-06-20 Thread Sean Owen
I'm not sure that's the conclusion. It's not trivial to tune and
configure YARN and Spark to match your app's memory needs and profile,
but, it's also just a matter of setting them properly. I'm not clear
you've set the executor memory for example, in particular
spark.yarn.executor.memoryOverhead

Everything else you mention is a symptom of YARN shutting down your
jobs because your memory settings don't match what your app does.
They're not problems per se, based on what you have provided.


On Mon, Jun 20, 2016 at 9:17 AM, Zhiliang Zhu
 wrote:
> Hi Alexander ,
>
> Thanks a lot for your comments.
>
> Spark seems not that stable when it comes to run big job, too much data or
> too much time, yes, the problem is gone when reducing the scale.
> Sometimes reset some job running parameter (such as --drive-memory may help
> in GC issue) , sometimes may rewrite the codes by applying other algorithm.
>
> As you commented the shuffle operation, it sounds some as the reason ...
>
> Best Wishes !
>
>
>
> On Friday, June 17, 2016 8:45 PM, Alexander Kapustin 
> wrote:
>
>
> Hi Zhiliang,
>
> Yes, find the exact reason of failure is very difficult. We have issue with
> similar behavior, due to limited time for investigation, we reduce the
> number of processed data, and problem has gone.
>
> Some points which may help you in investigations:
> · If you start spark-history-server (or monitoring running
> application on 4040 port), look into failed stages (if any). By default
> Spark try to retry stage execution 2 times, after that job fails
> · Some useful information may contains in yarn logs on Hadoop nodes
> (yarn--nodemanager-.log), but this is only information about
> killed container, not about the reasons why this stage took so much memory
>
> As I can see in your logs, failed step relates to shuffle operation, could
> you change your job to avoid massive shuffle operation?
>
> --
> WBR, Alexander
>
> From: Zhiliang Zhu
> Sent: 17 июня 2016 г. 14:10
> To: User; kp...@hotmail.com
> Subject: Re: spark job automatically killed without rhyme or reason
>
>
> Show original message
>
>
> Hi Alexander,
>
> is your yarn userlog   just for the executor log ?
>
> as for those logs seem a little difficult to exactly decide the wrong point,
> due to sometimes successful job may also have some kinds of the error  ...
> but will repair itself.
> spark seems not that stable currently ...
>
> Thank you in advance~
>
>
>
> On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu  wrote:
>
>
> Hi Alexander,
>
> Thanks a lot for your reply.
>
> Yes, submitted by yarn.
> Do you just mean in the executor log file by way of yarn logs -applicationId
> id,
>
> in this file, both in some containers' stdout  and stderr :
>
> 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive
> connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one.
> 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks
> java.io.IOException: Failed to connect to
> ip-172-31-20-104/172.31.20.104:49991  <-- may it be due to
> that spark is not stable, and spark may repair itself for these kinds of
> error ? (saw some in successful run )
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> 
> Caused by: java.net.ConnectException: Connection refused:
> ip-172-31-20-104/172.31.20.104:49991
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>
>
> 16/06/17 11:54:38 ERROR executor.Executor: Managed memory leak detected;
> size = 16777216 bytes, TID = 100323   <-   would it be
> memory leak issue? though no GC exception threw for other normal kinds of
> out of memory
> 16/06/17 11:54:38 ERROR executor.Executor: Exception in task 145.0 in stage
> 112.0 (TID 100323)
> java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:837)
> at
> 

Re: spark job automatically killed without rhyme or reason

2016-06-20 Thread Zhiliang Zhu
Hi Alexander ,
Thanks a lot for your comments.
Spark seems not that stable when it comes to run big job, too much data or too 
much time, yes, the problem is gone when reducing the scale.Sometimes reset 
some job running parameter (such as --drive-memory may help in GC issue) , 
sometimes may rewrite the codes by applying other algorithm.
As you commented the shuffle operation, it sounds some as the reason ...
Best Wishes !  
 

On Friday, June 17, 2016 8:45 PM, Alexander Kapustin  
wrote:
 

 #yiv4291334619 #yiv4291334619 -- _filtered #yiv4291334619 
{font-family:Wingdings;panose-1:5 0 0 0 0 0 0 0 0 0;} _filtered #yiv4291334619 
{panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv4291334619 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv4291334619 
#yiv4291334619 p.yiv4291334619MsoNormal, #yiv4291334619 
li.yiv4291334619MsoNormal, #yiv4291334619 div.yiv4291334619MsoNormal 
{margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv4291334619 a:link, 
#yiv4291334619 span.yiv4291334619MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv4291334619 a:visited, #yiv4291334619 
span.yiv4291334619MsoHyperlinkFollowed 
{color:#954F72;text-decoration:underline;}#yiv4291334619 
p.yiv4291334619MsoListParagraph, #yiv4291334619 
li.yiv4291334619MsoListParagraph, #yiv4291334619 
div.yiv4291334619MsoListParagraph 
{margin-top:0cm;margin-right:0cm;margin-bottom:0cm;margin-left:36.0pt;margin-bottom:.0001pt;font-size:11.0pt;}#yiv4291334619
 span.yiv4291334619qtd-expansion-text {}#yiv4291334619 
.yiv4291334619MsoChpDefault {} _filtered #yiv4291334619 {margin:2.0cm 42.5pt 
2.0cm 3.0cm;}#yiv4291334619 div.yiv4291334619WordSection1 {}#yiv4291334619 
_filtered #yiv4291334619 {} _filtered #yiv4291334619 {font-family:Symbol;} 
_filtered #yiv4291334619 {} _filtered #yiv4291334619 {font-family:Wingdings;} 
_filtered #yiv4291334619 {font-family:Symbol;} _filtered #yiv4291334619 {} 
_filtered #yiv4291334619 {font-family:Wingdings;} _filtered #yiv4291334619 
{font-family:Symbol;} _filtered #yiv4291334619 {} _filtered #yiv4291334619 
{font-family:Wingdings;}#yiv4291334619 ol {margin-bottom:0cm;}#yiv4291334619 ul 
{margin-bottom:0cm;}#yiv4291334619 Hi Zhiliang,    Yes, find the exact reason 
of failure is very difficult. We have issue with similar behavior, due to 
limited time for investigation, we reduce the number of processed data, and 
problem has gone.    Some points which may help you in investigations: ·
If you start spark-history-server (or monitoring running application on 4040 
port), look into failed stages (if any). By default Spark try to retry stage 
execution 2 times, after that job fails·Some useful information may 
contains in yarn logs on Hadoop nodes (yarn--nodemanager-.log), but 
this is only information about killed container, not about the reasons why this 
stage took so much memory   As I can see in your logs, failed step relates to 
shuffle operation, could you change your job to avoid massive shuffle 
operation?    --WBR, Alexander   From: Zhiliang Zhu
Sent: 17 июня 2016 г. 14:10
To: User; kp...@hotmail.com
Subject: Re: spark job automatically killed without rhyme or reason   
Show original message

Hi Alexander,
is your yarn userlog   just for the executor log ?
as for those logs seem a little difficult to exactly decide the wrong point, 
due to sometimes successful job may also have some kinds of the error  ... but 
will repair itself.spark seems not that stable currently     ...
Thank you in advance~  

On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu  wrote:


Hi Alexander,
Thanks a lot for your reply.
Yes, submitted by yarn.Do you just mean in the executor log file by way of yarn 
logs -applicationId id, 
in this file, both in some containers' stdout  and stderr :
16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive connection 
to ip-172-31-20-104/172.31.20.104:49991, creating a new one.
16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while beginning 
fetch of 1 outstanding blocksjava.io.IOException:Failed to connect to 
ip-172-31-20-104/172.31.20.104:49991              <--may it be due to that 
spark is not stable, and spark may repair itself for these kinds of error ? 
(saw some in successful run )
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)Caused
 by: java.net.ConnectException: Connection refused: 
ip-172-31-20-104/172.31.20.104:49991        at 
sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)        
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
        at 

Re: Switching broadcast mechanism from torrrent

2016-06-20 Thread Daniel Haviv
I agree, it was by mistake.
I just updated so that the next person looking for torrent broadcast issues
will have a hint :)

Thank you.
Daniel

On Sun, Jun 19, 2016 at 5:26 PM, Ted Yu  wrote:

> I think good practice is not to hold on to SparkContext in mapFunction.
>
> On Sun, Jun 19, 2016 at 7:10 AM, Takeshi Yamamuro 
> wrote:
>
>> How about using `transient` annotations?
>>
>> // maropu
>>
>> On Sun, Jun 19, 2016 at 10:51 PM, Daniel Haviv <
>> daniel.ha...@veracity-group.com> wrote:
>>
>>> Hi,
>>> Just updating on my findings for future reference.
>>> The problem was that after refactoring my code I ended up with a scala
>>> object which held SparkContext as a member, eg:
>>> object A  {
>>>  sc: SparkContext = new SparkContext
>>>  def mapFunction  {}
>>> }
>>>
>>> and when I called rdd.map(A.mapFunction) it failed as A.sc is not
>>> serializable.
>>>
>>> Thanks,
>>> Daniel
>>>
>>> On Tue, Jun 7, 2016 at 10:13 AM, Takeshi Yamamuro >> > wrote:
>>>
 Hi,

 Since `HttpBroadcastFactory` has already been removed in master, so
 you cannot use the broadcast mechanism in future releases.

 Anyway, I couldn't find a root cause only from the stacktraces...

 // maropu




 On Mon, Jun 6, 2016 at 2:14 AM, Daniel Haviv <
 daniel.ha...@veracity-group.com> wrote:

> Hi,
> I've set  spark.broadcast.factory to
> org.apache.spark.broadcast.HttpBroadcastFactory and it indeed resolve my
> issue.
>
> I'm creating a dataframe which creates a broadcast variable internally
> and then fails due to the torrent broadcast with the following stacktrace:
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_3_piece0 of broadcast_3
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
> at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220)
>
> I'm using spark 1.6.0 on CDH 5.7
>
> Thanks,
> Daniel
>
>
> On Wed, Jun 1, 2016 at 5:52 PM, Ted Yu  wrote:
>
>> I found spark.broadcast.blockSize but no parameter to switch
>> broadcast method.
>>
>> Can you describe the issues with torrent broadcast in more detail ?
>>
>> Which version of Spark are you using ?
>>
>> Thanks
>>
>> On Wed, Jun 1, 2016 at 7:48 AM, Daniel Haviv <
>> daniel.ha...@veracity-group.com> wrote:
>>
>>> Hi,
>>> Our application is failing due to issues with the torrent broadcast,
>>> is there a way to switch to another broadcast method ?
>>>
>>> Thank you.
>>> Daniel
>>>
>>
>>
>


 --
 ---
 Takeshi Yamamuro

>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Jörn Franke
If you insert the data sorted then there is not need to bucket the data. 
You can even create an index in Spark. Simply set the outputformat 
configuration orc.create.index = true 


> On 20 Jun 2016, at 09:10, Mich Talebzadeh  wrote:
> 
> Right, you concern is that you expect storeindex in ORC file to help the 
> optimizer.
> 
> Frankly I do not know what 
> write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under the 
> bonnet. From my experience in order for ORC index to be used you need to 
> bucket the table. I have explained these before in here
> 
> Now it is possible that you have not updated statistics on the table
> 
> Even with Spark I tend to create my ORC table explicitly through Spark SQL.
> 
> You stated the join scans all the underlying ORC table. Your "id" column I 
> assume is unique. So I would bucket it using id column.
> 
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> 
>> On 20 June 2016 at 07:07, Mohanraj Ragupathiraj  
>> wrote:
>> Hi Mich,
>> 
>> Thank you for your reply. 
>> 
>> Let me explain more clearly.
>> 
>> File with 100 records needs to joined with a Big lookup File created in ORC 
>> format (500 million records). The Spark process i wrote is returing back the 
>> matching records and is working fine. My concern is that it loads the entire 
>> file (500 million) and matches with the 100 records instead of loading only 
>> the stripes with matching keys. I read that ORC file provides indexes 
>> (https://orc.apache.org/docs/indexes.html) and i assumned that when i join 
>> using Dataframes, the indexes will be used, resulting in loading of only 
>> matching records/stripes for processing instead of the whole table.
>> 
>>> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh 
>>>  wrote:
>>> Hi,
>>> 
>>> To start when you store the data in ORC file can you verify that the data 
>>> is there?
>>> 
>>> For example register it as tempTable
>>> 
>>> processDF.register("tmp")
>>> sql("select count(1) from tmp).show
>>> 
>>> Also what do you mean by index file in ORC?
>>> 
>>> HTH
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>>  
>>> 
 On 20 June 2016 at 05:01, Mohanraj Ragupathiraj  
 wrote:
 I am trying to join a Dataframe(say 100 records) with an ORC file with 500 
 million records through Spark(can increase to 4-5 billion, 25 bytes each 
 record).
 
 I used Spark hiveContext API.
 
 ORC File Creation Code
 
 //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
 DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
 fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");
 ORC File Reading Code
 
 HiveContext hiveContext = new HiveContext(sparkContext);
 DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
 // allRecords is dataframe
 DataFrame processDf = 
 allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
 processDf.show();
 When I read the ORC file, the get following in my Spark Logs:
 
 Input split: 
 file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348
 min key = null, max key = null
 Reading ORC rows from 
 file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
  with {include: [true, true, true], offset: 0, length: 9223372036854775807}
 Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
 Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 
 56,PROCESS_LOCAL, 2220 bytes)
 Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
 Running task 56.0 in stage 2.0 (TID 60)
 Although the Spark job completes successfully, I think, its not able to 
 utilize ORC index file capability and thus checks through entire block of 
 ORC data before moving on.
 
 Question
 
 -- Is it a normal behaviour, or I have to set any configuration before 
 saving the data in ORC format?
 
 -- If it is NORMAL, what is the best way to join so that we discrad 
 non-matching records on the disk level(maybe only the index file for ORC 
 data is loaded)?
 
>> 
>> 
>> 
>> -- 
>> Thanks and Regards
>> Mohan
>> VISA Pte Limited, Singapore.
> 


Re: Spark Kafka stream processing time increasing gradually

2016-06-20 Thread N B
Its actually necessary to retire keys that become "Zero" or "Empty" so to
speak. In your case, the key is "imageURL" and values are a dictionary, one
of whose fields is "count" that you are maintaining. For simplicity and
illustration's sake I will assume imageURL to be a strings like "abc". Your
slide duration is 60 and window duration is 1800 seconds.

Now consider the following chain of events in your stream.

batch 1 : "abc"
batch 2 : "xyz"
batch 3 : "abc"

and now for the rest of the stream, the keys "abc" or "xyz" never occur.

At the end of the third batch, the generated window rdd has
{ "abc" -> count = 2, "xyz" -> count = 1 }.
When the first batch falls off after 1800 seconds, it will become
{ "abc -> count = 1, "xyz" -> count = 1 }.
60 seconds later, it will become
{ "abc" -> count = 1, "xyz" -> count = 0 }
and a further 60 seconds later, the 3rd batch is removed from the window
and the new window rdd becomes
{ "abc" -> count = 0, "xyz" -> count = 0 }.

I hope you can see what is wrong with this. These keys will be perpetually
held in memory even though there is no need for them to be there. This
growth in the size of the generated window rdd is what's giving rise to the
deteriorating processing time in your case.

A filter function that's equivalent of "count != 0" will suffice to
remember only those keys that have not become "Zero".

HTH,
NB



On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh 
wrote:

> Hi,
> According to the docs (
> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
> filerFunc can be used to retain expiring keys. I do not want to retain any
> expiring key, so I do not understand how can this help me stabilize it.
> Please correct me if this is not the case.
>
> I am also specifying both reduceFunc and invReduceFunc. Can you can a
> sample code of what you are using.
>
> Thanks.
>
> On Fri, Jun 17, 2016 at 3:43 AM, N B  wrote:
>
>> We had this same issue with the reduceByKeyAndWindow API that you are
>> using. For fixing this issue, you have to use  different flavor of that
>> API, specifically the 2 versions that allow you to give a 'Filter function'
>> to them. Putting in the filter functions helped stabilize our application
>> too.
>>
>> HTH
>> NB
>>
>>
>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
>> wrote:
>>
>>> Hi all,
>>> I have a python streaming job which is supposed to run 24x7. I am unable
>>> to stabilize it. The job just counts no of links shared in a 30 minute
>>> sliding window. I am using reduceByKeyAndWindow operation with a batch of
>>> 30 seconds, slide interval of 60 seconds.
>>>
>>> The kafka queue has a rate of nearly 2200 messages/second which can
>>> increase to 3000 but the mean is 2200.
>>>
>>> I have played around with batch size, slide interval, and by increasing
>>> parallelism with no fruitful result. These just delay the destabilization.
>>>
>>> GC time is usually between 60-100 ms.
>>>
>>> I also noticed that the jobs were not distributed to other nodes in the
>>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>>> After which I have noticed that the job is getting distributed properly.
>>>
>>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>>> of ram.
>>>
>>> Code and configuration: http://pastebin.com/93DMSiji
>>>
>>> Streaming screenshot: http://imgur.com/psNfjwJ
>>>
>>> I need help in debugging the issue. Any help will be appreciated.
>>>
>>> --
>>> Roshan Singh
>>>
>>>
>>
>
>
> --
> Roshan Singh
> http://roshansingh.in
>


Unsubscribe

2016-06-20 Thread Ram Krishna
Hi Sir,

Please unsubscribe me

-- 
Regards,
Ram Krishna KT


Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Mohanraj Ragupathiraj
Hi Mich,

Thank you for your reply.

Let me explain more clearly.

File with 100 records needs to joined with a Big lookup File created in ORC
format (500 million records). The Spark process i wrote is returing back
the matching records and is working fine. My concern is that it loads the
entire file (500 million) and matches with the 100 records instead of
loading only the stripes with matching keys. I read that ORC file provides
indexes (https://orc.apache.org/docs/indexes.html) and i assumned that when
i join using Dataframes, the indexes will be used, resulting in loading of
only matching records/stripes for processing instead of the whole table.

On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> To start when you store the data in ORC file can you verify that the data
> is there?
>
> For example register it as tempTable
>
> processDF.register("tmp")
> sql("select count(1) from tmp).show
>
> Also what do you mean by index file in ORC?
>
> HTH
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj 
> wrote:
>
>> I am trying to join a Dataframe(say 100 records) with an ORC file with
>> 500 million records through Spark(can increase to 4-5 billion, 25 bytes
>> each record).
>>
>> I used Spark hiveContext API.
>>
>> *ORC File Creation Code*
>>
>> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
>> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
>> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");
>>
>> *ORC File Reading Code*
>>
>> HiveContext hiveContext = new HiveContext(sparkContext);
>> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
>> // allRecords is dataframe
>> DataFrame processDf = 
>> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
>> processDf.show();
>>
>> When I read the ORC file, the get following in my Spark Logs:
>>
>> Input split: 
>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min
>>  key = null, max key = null*
>> Reading ORC rows from 
>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
>>  with {include: [true, true, true], offset: 0, length: 9223372036854775807}
>> Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
>> Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 
>> 56,PROCESS_LOCAL, 2220 bytes)
>> Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
>> Running task 56.0 in stage 2.0 (TID 60)
>>
>> Although the Spark job completes successfully, I think, its not able to
>> utilize ORC index file capability and thus checks through entire block of
>> ORC data before moving on.
>>
>> *Question*
>>
>> -- Is it a normal behaviour, or I have to set any configuration before
>> saving the data in ORC format?
>>
>> -- If it is *NORMAL*, what is the best way to join so that we discrad
>> non-matching records on the disk level(maybe only the index file for ORC
>> data is loaded)?
>>
>
>


-- 
Thanks and Regards
Mohan
VISA Pte Limited, Singapore.