RE: Need to user univariate summary stats

2016-02-04 Thread Lohith Samaga M
Hi Arun,
You can do df.agg(max(,,), min(..)).


Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga

From: Arunkumar Pillai [mailto:arunkumar1...@gmail.com]
Sent: Thursday, February 04, 2016 14.53
To: user@spark.apache.org
Subject: Need to user univariate summary stats

Hi

I'm currently using query

sqlContext.sql("SELECT MAX(variablesArray) FROM " + tableName)

to extract mean max min.
is there any better  optimized way ?


In the example i saw df.groupBy("key").agg(skewness("a"), kurtosis("a"))

But i don't have key anywhere in the data.

How to extract the univariate summary stats from df. please help

--
Thanks and Regards
Arun
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Re: library dependencies to run spark local mode

2016-02-04 Thread Ted Yu
Which Spark release are you using ?

Is there other clue from the logs ? If so, please pastebin.

Cheers

On Thu, Feb 4, 2016 at 2:49 AM, Valentin Popov 
wrote:

> Hi all,
>
> I’m trying run spark on local mode, i using such code:
>
> SparkConf conf = new SparkConf().setAppName("JavaWord2VecExample"
> ).setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> but after while (10 sec) I got Exception, here is a stack trace:
> java.util.concurrent.TimeoutException: Futures timed out after [1
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
> at org.apache.spark.SparkContext.(SparkContext.scala:457)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
> at
> com.stimulus.archiva.datamining.ml.Word2VecTest.word2vec(Word2VecTest.java:23)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
>
>
>
> Any one know library dependencies that can cause such error?
>
> Regards,
> Valentin
>
>
>
>
>


Re: Spark job does not perform well when some RDD in memory and some on Disk

2016-02-04 Thread Alonso Isidoro Roman
"But learned that it is better not to reduce it to 0."

could you explain a bit more this sentence?

thanks

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


2016-02-04 11:33 GMT+01:00 Prabhu Joseph :

> Okay, the reason for the task delay within executor when some RDD in
> memory and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY,
> in this case Scheduler waits
> for *spark.locality.wait *3 seconds default. During this period,
> scheduler waits to launch a data-local task before giving up and launching
> it on a less-local node.
>
> So after making it 0, all tasks started parallel. But learned that it is
> better not to reduce it to 0.
>
>
> On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph 
> wrote:
>
>> Hi All,
>>
>>
>> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
>> RDD's created each approx 250MB data) and there are two jobs. Job A gets
>> the line with "a" and the Job B gets the line with "b". The spark
>> application is ran multiple times, each time with
>> different executor memory, and enable/disable cache() function. Job A
>> performance is same in all the runs as it has to read the entire data first
>> time from Disk.
>>
>> Spark Cluster - standalone mode with Spark Master, single worker node (12
>> cores, 16GB memory)
>>
>> val logData = sc.textFile(logFile, 2)
>> var numAs = logData.filter(line => line.contains("a")).count()
>> var numBs = logData.filter(line => line.contains("b")).count()
>>
>>
>> *Job B (which has 5 tasks) results below:*
>>
>> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]
>>
>> Since logData is not cached, the job B has to again read the 1.2GB
>> data from hadoop into memory and all the 5 tasks started parallel and each
>> took 2 sec (29ms for GC) and the
>>  overall job completed in 2 seconds.
>>
>> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took
>> 4 seconds [ran2 image, ran2_cache image]
>>
>>  val logData = sc.textFile(logFile, 2).cache()
>>
>>  The Executor does not have enough memory to cache and hence again
>> needs to read the entire 1.2GB data from hadoop into memory.  But since the
>> cache() is used, leads to lot of GC pause leading to slowness in task
>> completion. Each task started parallel and
>> completed in 4 seconds (more than 1 sec for GC).
>>
>> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took
>> 10 seconds [ran3 image]*
>>
>>  The Executor has memory that can fit 4 RDD partitions into memory
>> but 5th RDD it has to read from Hadoop. 4 tasks are started parallel and
>> they completed in 0.3 seconds without GC. But the 5th task which has to
>> read RDD from disk is started after 4 seconds, and gets completed in 2
>> seconds. Analysing why the 5th task is not started parallel with other
>> tasks or at least why it is not started immediately after the other task
>> completion.
>>
>> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
>> took 0.3 seconds [ran4 image]
>>
>>  The executor has enough memory to cache all the 5 RDD. All 5 tasks
>> are started in parallel and gets completed within 0.3 seconds.
>>
>>
>> So Spark performs well when entire input data is in Memory or None. In
>> case of some RDD in memory and some from disk, there is a delay in
>> scheduling the fifth task, is it a expected behavior or a possible Bug.
>>
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>>
>


Re: library dependencies to run spark local mode

2016-02-04 Thread Valentin Popov
It is 1.6.0 builded from sources. 

I’m trying it on mine eclipse project and want use spark on it, so I put 
libraries there and have no ClassNotFoundException 

akka-actor_2.10-2.3.11.jar
akka-remote_2.10-2.3.11.jar
akka-slf4j_2.10-2.3.11.jar
config-1.2.1.jar
hadoop-auth-2.7.1.jar
hadoop-common-2.7.1.jar
hadoop-mapreduce-client-common-2.7.1.jar
hadoop-mapreduce-client-core-2.7.1.jar
hadoop-mapreduce-client-jobclient-2.7.1.jar
hadoop-mapreduce-client-shuffle-2.7.1.jar
hadoop-yarn-api-2.7.1.jar
hadoop-yarn-common-2.7.1.jar
hamcrest-core-1.3.jar
netty-all-4.0.29.Final.jar
scala-library-2.10.6.jar
spark-core_2.10-1.6.0.jar
spark-mllib_2.10-1.6.0.jar
spark-network-common_2.10-1.6.0.jar
spark-network-shuffle_2.10-1.6.0.jar
spark-sql_2.10-1.6.0.jar
spark-streaming_2.10-1.6.0.jar


> On 4 февр. 2016 г., at 13:51, Ted Yu  wrote:
> 
> Which Spark release are you using ?
> 
> Is there other clue from the logs ? If so, please pastebin.
> 
> Cheers
> 
> On Thu, Feb 4, 2016 at 2:49 AM, Valentin Popov  > wrote:
> Hi all, 
> 
> I’m trying run spark on local mode, i using such code: 
> 
> SparkConf conf = new 
> SparkConf().setAppName("JavaWord2VecExample").setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf);
> 
> but after while (10 sec) I got Exception, here is a stack trace: 
> java.util.concurrent.TimeoutException: Futures timed out after [1 
> milliseconds]
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.result(package.scala:107)
>   at akka.remote.Remoting.start(Remoting.scala:179)
>   at 
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>   at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
>   at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
>   at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
>   at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
>   at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>   at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>   at 
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
>   at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
>   at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
>   at 
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
>   at 
> org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
>   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
>   at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>   at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
>   at org.apache.spark.SparkContext.(SparkContext.scala:457)
>   at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
>   at 
> com.stimulus.archiva.datamining.ml.Word2VecTest.word2vec(Word2VecTest.java:23)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> 

Re: [Spark 1.6] Mismatch in kurtosis values

2016-02-04 Thread Sean Owen
It's returning excess kurtosis and not the fourth moment, strictly
speaking. I don't have docs in front of me to check but if that's not
documented it should be.

On Thu, Feb 4, 2016, 15:22 Arunkumar Pillai  wrote:

> Hi
>
> I have observed that kurtosis values coming from apache spark has a
> difference of 3.
>
> The value coming from excel and in R as same values(11.333)  but the
> kurtosis value coming from spark1.6 differs by 3 (8.333).
>
> Please let me know if I'm doing something wrong.
>
> I'm executing via sqlcontext.sql("select kurtosis(columnname) from
> tablename)
>
> --
> Thanks and Regards
> Arun
>


library dependencies to run spark local mode

2016-02-04 Thread Valentin Popov
Hi all, 

I’m trying run spark on local mode, i using such code: 

SparkConf conf = new 
SparkConf().setAppName("JavaWord2VecExample").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);

but after while (10 sec) I got Exception, here is a stack trace: 
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:179)
at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
at org.apache.spark.SparkContext.(SparkContext.scala:457)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
at 
com.stimulus.archiva.datamining.ml.Word2VecTest.word2vec(Word2VecTest.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)



Any one know library dependencies that can cause such error? 

Regards,
Valentin






Re: Spark job does not perform well when some RDD in memory and some on Disk

2016-02-04 Thread Prabhu Joseph
Okay, the reason for the task delay within executor when some RDD in memory
and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY, in
this case Scheduler waits
for *spark.locality.wait *3 seconds default. During this period, scheduler
waits to launch a data-local task before giving up and launching it on a
less-local node.

So after making it 0, all tasks started parallel. But learned that it is
better not to reduce it to 0.


On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph 
wrote:

> Hi All,
>
>
> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
> RDD's created each approx 250MB data) and there are two jobs. Job A gets
> the line with "a" and the Job B gets the line with "b". The spark
> application is ran multiple times, each time with
> different executor memory, and enable/disable cache() function. Job A
> performance is same in all the runs as it has to read the entire data first
> time from Disk.
>
> Spark Cluster - standalone mode with Spark Master, single worker node (12
> cores, 16GB memory)
>
> val logData = sc.textFile(logFile, 2)
> var numAs = logData.filter(line => line.contains("a")).count()
> var numBs = logData.filter(line => line.contains("b")).count()
>
>
> *Job B (which has 5 tasks) results below:*
>
> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]
>
> Since logData is not cached, the job B has to again read the 1.2GB
> data from hadoop into memory and all the 5 tasks started parallel and each
> took 2 sec (29ms for GC) and the
>  overall job completed in 2 seconds.
>
> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took
> 4 seconds [ran2 image, ran2_cache image]
>
>  val logData = sc.textFile(logFile, 2).cache()
>
>  The Executor does not have enough memory to cache and hence again
> needs to read the entire 1.2GB data from hadoop into memory.  But since the
> cache() is used, leads to lot of GC pause leading to slowness in task
> completion. Each task started parallel and
> completed in 4 seconds (more than 1 sec for GC).
>
> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took 10
> seconds [ran3 image]*
>
>  The Executor has memory that can fit 4 RDD partitions into memory but
> 5th RDD it has to read from Hadoop. 4 tasks are started parallel and they
> completed in 0.3 seconds without GC. But the 5th task which has to read RDD
> from disk is started after 4 seconds, and gets completed in 2 seconds.
> Analysing why the 5th task is not started parallel with other tasks or at
> least why it is not started immediately after the other task completion.
>
> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
> took 0.3 seconds [ran4 image]
>
>  The executor has enough memory to cache all the 5 RDD. All 5 tasks
> are started in parallel and gets completed within 0.3 seconds.
>
>
> So Spark performs well when entire input data is in Memory or None. In
> case of some RDD in memory and some from disk, there is a delay in
> scheduling the fifth task, is it a expected behavior or a possible Bug.
>
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>


[Spark 1.6] Mismatch in kurtosis values

2016-02-04 Thread Arunkumar Pillai
Hi

I have observed that kurtosis values coming from apache spark has a
difference of 3.

The value coming from excel and in R as same values(11.333)  but the
kurtosis value coming from spark1.6 differs by 3 (8.333).

Please let me know if I'm doing something wrong.

I'm executing via sqlcontext.sql("select kurtosis(columnname) from
tablename)

-- 
Thanks and Regards
Arun


problem in creating function in sparkR for dummy handling

2016-02-04 Thread Devesh Raj Singh
Hi,

I have written a code to create dummy variables in sparkR

df <- createDataFrame(sqlContext, iris)
class(dtypes(df))
cat.column<-vector(mode="character",length=nrow(df))
cat.column<-collect(select(df,df$Species))

lev<-length(levels(as.factor(unlist(cat.column
for (j in 1:lev){



dummy.df.new<-withColumn(df,paste0(colnames(cat.column),j),ifelse(df$Species==levels(as.factor(unlist(cat.column)))[j],1,0)
)

  df<-dummy.df.new

}

*head(df): gives me the desired output:*

Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1 Species2
Species3
1  5.1 3.5  1.4 0.2  setosa1
 00
2  4.9 3.0  1.4 0.2  setosa1
 00
3  4.7 3.2  1.3 0.2  setosa1
 00
4  4.6 3.1  1.5 0.2  setosa1
 00
5  5.0 3.6  1.4 0.2  setosa1
 00
6  5.4 3.9  1.7 0.4  setosa1
 00



*But the same thing when I try to do by creating a function *

# x= dataframe$x, categorical column within the dataframe
# dataframe=sparkR dataframe

dummyhandle<-function(dataframe,x){

 cat.column<-vector(mode="character",length=nrow(dataframe))
cat.column<-collect(select(dataframe,x))
lev<-length(levels(as.factor(unlist(cat.column

for (j in 1:lev){



dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(x==levels(as.factor(unlist(cat.column)))[j],1,0)
)

  dataframe<-dummy.df


}
return(dataframe)
}

*throws the following error:*

Error in withColumn(dataframe, paste0(colnames(cat.column), j), ifelse(x ==
 :
  error in evaluating the argument 'col' in selecting a method for function
'withColumn': Error in if (le > 0) paste0("[1:", paste(le), "]") else "(0)"
:
  argument is not interpretable as logical


-- 
Warm regards,
Devesh.


Re: library dependencies to run spark local mode

2016-02-04 Thread Valentin Popov
I think this is an answer… 


HADOOP_HOME or hadoop.home.dir are not set.

Sorry


2016-02-04 14:10:08 o.a.h.u.Shell [DEBUG] Failed to detect a valid hadoop home 
directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:303) 
[hadoop-common-2.7.1.jar:na]
at org.apache.hadoop.util.Shell.(Shell.java:328) 
[hadoop-common-2.7.1.jar:na]
at org.apache.hadoop.util.StringUtils.(StringUtils.java:80) 
[hadoop-common-2.7.1.jar:na]
at 
org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:610)
 [hadoop-common-2.7.1.jar:na]
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:272)
 [hadoop-common-2.7.1.jar:na]
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:260)
 [hadoop-common-2.7.1.jar:na]
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:790)
 [hadoop-common-2.7.1.jar:na]
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760)
 [hadoop-common-2.7.1.jar:na]
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633)
 [hadoop-common-2.7.1.jar:na]
at 
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2136)
 [spark-core_2.10-1.6.0.jar:1.6.0]
at 
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2136)
 [spark-core_2.10-1.6.0.jar:1.6.0]
at scala.Option.getOrElse(Option.scala:120) 
[scala-library-2.10.6.jar:na]
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2136) 
[spark-core_2.10-1.6.0.jar:1.6.0]
at org.apache.spark.SparkContext.(SparkContext.scala:322) 
[spark-core_2.10-1.6.0.jar:1.6.0]
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59) 
[spark-core_2.10-1.6.0.jar:1.6.0]
at 
com.stimulus.archiva.datamining.ml.Word2VecTest.word2vec(Word2VecTest.java:23) 
[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[na:1.8.0_71]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[na:1.8.0_71]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.8.0_71]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_71]
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
[junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
[junit-4.12.jar:4.12]
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
 [.cp/:na]
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
[.cp/:na]
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
 [.cp/:na]
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
 [.cp/:na]
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
 [.cp/:na]
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
 [.cp/:na]
2016-02-04 14:10:08 o.a.h.u.Shell [DEBUG] setsid is not available on this 
machine. So not using it.
2016-02-04 14:10:08 o.a.h.u.Shell [DEBUG] setsid exited with exit code 0
2016-02-04 14:10:08 o.a.h.s.a.u.KerberosName [DEBUG] Kerberos krb5 
configuration not found, setting default realm to empty
2016-02-04 14:10:08 o.a.h.s.Groups [DEBUG]  Creating new Groups object
2016-02-04 14:10:08 o.a.h.u.NativeCodeLoader [DEBUG] Trying to load the 

Recommended storage solution for my setup (~5M items, 10KB pr.)

2016-02-04 Thread habitats
Hello

I have ~5 million text documents, each around 10-15KB in size, and split
into ~15 columns. I intend to do machine learning, and thus I need to
extract all of the data at the same time, and potentially update everything
on every run.

So far I've just used json serializing, or simply cached the RDD to dick.
However, I feel like there must be a better way.

I have tried HBase, but I had a hard time setting it up and getting it to
work properly. It also felt like a lot of work for my simple requirements. I
want something /simple/.

Any suggestions?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-storage-solution-for-my-setup-5M-items-10KB-pr-tp26150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Recommended storage solution for my setup (~5M items, 10KB pr.)

2016-02-04 Thread Patrick Skjennum

(Am I doing this mailinglist thing right? Never used this ...)

I do not have a cluster.

Initially I tried to setup hadoop+hbase+spark, but after spending a week 
trying to get work, I gave up. I had a million problems with mismatching 
versions, and things working locally on the server, but not 
programatically through my client computer, and vice versa. There was 
/always something /that did not work, one way another.


And since I had to actually get things /done /rather than becoming an 
expert in clustering, I gave up and just used simple serializing.


Now I'm going to make a second attempt, but this time around I'll ask 
for help:p


--
mvh
Patrick Skjennum


On 04.02.2016 22.14, Ted Yu wrote:

bq. had a hard time setting it up

Mind sharing your experience in more detail :-)
If you already have a hadoop cluster, it should be relatively straight 
forward to setup.


Tuning needs extra effort.

On Thu, Feb 4, 2016 at 12:58 PM, habitats > wrote:


Hello

I have ~5 million text documents, each around 10-15KB in size, and
split
into ~15 columns. I intend to do machine learning, and thus I need to
extract all of the data at the same time, and potentially update
everything
on every run.

So far I've just used json serializing, or simply cached the RDD
to dick.
However, I feel like there must be a better way.

I have tried HBase, but I had a hard time setting it up and
getting it to
work properly. It also felt like a lot of work for my simple
requirements. I
want something /simple/.

Any suggestions?



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-storage-solution-for-my-setup-5M-items-10KB-pr-tp26150.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







cause of RPC error?

2016-02-04 Thread AlexG
I am simply trying to load an RDD from disk with
transposeRowsRDD.avro(baseInputFname).rdd.map( )
and I get this error in my log:

16/02/04 11:44:07 ERROR TaskSchedulerImpl: Lost executor 7 on nid00788:
Remote RPC client disassociated. Likely due to containers exceeding
thresholds, or network issues. Check driver logs for WARN messages.

When I check the log for that node (I guess this is what it means by
driver?) I see:

16/02/04 11:43:55 INFO TorrentBroadcast: Started reading broadcast variable
152
16/02/04 11:43:55 INFO MemoryStore: Block broadcast_152_piece0 stored as
bytes in memory (estimated size 19.3 KB, free 8.8 GB)
16/02/04 11:43:55 INFO TorrentBroadcast: Reading broadcast variable 152 took
4 ms
16/02/04 11:43:55 INFO MemoryStore: Block broadcast_152 stored as values in
memory (estimated size 364.3 KB, free 8.8 GB)
16/02/04 11:43:56 INFO MemoryStore: Block rdd_1634_1637 stored as values in
memory (estimated size 24.0 B, free 8.8 GB)
16/02/04 11:43:56 INFO Executor: Finished task 1637.0 in stage 0.0 (TID
1637). 2602 bytes result sent to driver
16/02/04 11:43:56 INFO CoarseGrainedExecutorBackend: Got assigned task 1643
16/02/04 11:43:56 INFO Executor: Running task 1643.0 in stage 0.0 (TID 1643)
16/02/04 11:43:56 INFO CacheManager: Partition rdd_1634_1643 not found,
computing it
16/02/04 11:43:56 INFO HadoopRDD: Input split:
file:/global/cscratch1/sd/gittens/CFSRA/CFSRAparquetTranspose/CFSRAparquetTranspose0/part-r-7-ddeb3951-d7da-4926-a16c-d54d71850131.avro:134217728+33554432

So the executor seems to have crashed without any error message being
emitted, with plenty of memory on hand (I also grepped for WARNING messages,
didn't see any). Any idea on what might be happening, or how to debug?
Several other executors are also lost with the same behavior. I'm using
Spark in standalone mode.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cause-of-RPC-error-tp26151.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: submit spark job with spcified file for driver

2016-02-04 Thread Mohammed Guller
Here is the description for the --file option that you can specify to 
spark-submit:

--files FILES   Comma-separated list of files to be placed in the 
working directory of each executor.


Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: alexeyy3 [mailto:alexey.yakubov...@searshc.com] 
Sent: Thursday, February 4, 2016 2:18 PM
To: user@spark.apache.org
Subject: submit spark job with spcified file for driver

Is it possible to specify a file (with key-value properties) when submitting 
spark app with spark-submit? Some mails refers to the key --file, but docs.
does not mention it.
If you can specify a file, how to access it from spark job driver?

Thank you, Alexey



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/submit-spark-job-with-spcified-file-for-driver-tp26153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: submit spark job with spcified file for driver

2016-02-04 Thread Ted Yu
Please take a look at:
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

You would see '--files' argument

On Thu, Feb 4, 2016 at 2:17 PM, alexeyy3 
wrote:

> Is it possible to specify a file (with key-value properties) when
> submitting
> spark app with spark-submit? Some mails refers to the key --file, but docs.
> does not mention it.
> If you can specify a file, how to access it from spark job driver?
>
> Thank you, Alexey
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/submit-spark-job-with-spcified-file-for-driver-tp26153.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


submit spark job with spcified file for driver

2016-02-04 Thread alexeyy3
Is it possible to specify a file (with key-value properties) when submitting
spark app with spark-submit? Some mails refers to the key --file, but docs.
does not mention it.
If you can specify a file, how to access it from spark job driver?

Thank you, Alexey



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/submit-spark-job-with-spcified-file-for-driver-tp26153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Recommended storage solution for my setup (~5M items, 10KB pr.)

2016-02-04 Thread Ted Yu
bq. had a hard time setting it up

Mind sharing your experience in more detail :-)
If you already have a hadoop cluster, it should be relatively straight
forward to setup.

Tuning needs extra effort.

On Thu, Feb 4, 2016 at 12:58 PM, habitats  wrote:

> Hello
>
> I have ~5 million text documents, each around 10-15KB in size, and split
> into ~15 columns. I intend to do machine learning, and thus I need to
> extract all of the data at the same time, and potentially update everything
> on every run.
>
> So far I've just used json serializing, or simply cached the RDD to dick.
> However, I feel like there must be a better way.
>
> I have tried HBase, but I had a hard time setting it up and getting it to
> work properly. It also felt like a lot of work for my simple requirements.
> I
> want something /simple/.
>
> Any suggestions?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-storage-solution-for-my-setup-5M-items-10KB-pr-tp26150.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Dataset Encoders for SparseVector

2016-02-04 Thread Michael Armbrust
We are hoping to add better support for UDTs in the next release, but for
now you can use kryo to generate an encoder for any class:

implicit val vectorEncoder =
org.apache.spark.sql.Encoders.kryo[SparseVector]

On Thu, Feb 4, 2016 at 12:22 PM, raj.kumar  wrote:

> Hi,
>
> I have a DataFrame df with a column "feature" of type SparseVector that
> results from the ml library's VectorAssembler class.
>
> I'd like to get a Dataset of SparseVectors from this column, but when I do
> a
>
> df.as[SparseVector] scala complains that it doesn't know of an encoder for
> SparseVector. If I then try to implement the Encoder[T] interface for
> SparseVector I get the error
> "java.lang.RuntimeException: Only expression encoders are supported today"
>
> How can I get a Dataset[SparseVector] from the output of VectorAssembler?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-Encoders-for-SparseVector-tp26149.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: cause of RPC error?

2016-02-04 Thread AlexG
To clarify, that's the tail of the node stderr log, so the last message shown
is at the EOF.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cause-of-RPC-error-tp26151p26152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: add new column in the schema + Dataframe

2016-02-04 Thread Mohammed Guller
Hi Divya,
You can use the withColumn method from the DataFrame API. Here is the method 
signature:

def withColumn(colName: String, col: 
Column):
 DataFrame


Mohammed
Author: Big Data Analytics with 
Spark

From: Divya Gehlot [mailto:divya.htco...@gmail.com]
Sent: Thursday, February 4, 2016 1:29 AM
To: user @spark
Subject: add new column in the schema + Dataframe

Hi,
I am beginner in spark and using Spark 1.5.2 on YARN.(HDP2.3.4)
I have a use case where I have to read two input files and based on certain  
conditions in second input file ,have to add a new column in the first input 
file and save it .

I am using spark-csv to read my input files .
Would really appreciate if somebody would share their thoughts on best/feasible 
way of doing it(using dataframe API)


Thanks,
Divya




pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala

2016-02-04 Thread Divya Gehlot
Hi,
I have two input datasets
First input dataset like as below :

year,make,model,comment,blank
> "2012","Tesla","S","No comment",
> 1997,Ford,E350,"Go get one now they are going fast",
> 2015,Chevy,Volt


Second Input dataset :

TagId,condition
> 1997_cars,year = 1997 and model = 'E350'
> 2012_cars,year=2012 and model ='S'
> 2015_cars ,year=2015 and model = 'Volt'


Now my requirement is read first data set and based on the filtering
condition in second dataset need to tag rows of first input dataset by
introducing a new column TagId to first input data set
so the expected should look like :

year,make,model,comment,blank,TagId
> "2012","Tesla","S","No comment",2012_cars
> 1997,Ford,E350,"Go get one now they are going fast",1997_cars
> 2015,Chevy,Volt, ,2015_cars


I tried like :

val sqlContext = new SQLContext(sc)
> val carsSchema = StructType(Seq(
> StructField("year", IntegerType, true),
> StructField("make", StringType, true),
> StructField("model", StringType, true),
> StructField("comment", StringType, true),
> StructField("blank", StringType, true)))
>
> val carTagsSchema = StructType(Seq(
> StructField("TagId", StringType, true),
> StructField("condition", StringType, true)))
>
>
> val dfcars =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
> val dftags =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
>
> val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
> val cdtnval = dftags.select("condition")
> val df2=dfcars.filter(cdtnval)
> :35: error: overloaded method value filter with alternatives:
>   (conditionExpr: String)org.apache.spark.sql.DataFrame 
>   (condition:
> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
>  cannot be applied to (org.apache.spark.sql.DataFrame)
>val df2=dfcars.filter(cdtnval)


another way :

val col = dftags.col("TagId")
> val finaldf = dfcars.withColumn("TagId", col)
> org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5
> missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project
> [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
>
> finaldf.write.format("com.databricks.spark.csv").option("header",
> "true").save("/TestDivya/Spark/carswithtags.csv")



Would really appreciate if somebody give me pointers how can I pass the
filter condition(second dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark

Thanks


different behavior while using createDataFrame and read.df in SparkR

2016-02-04 Thread Devesh Raj Singh
Hi,

I am using Spark 1.5.1

When I do this

df <- createDataFrame(sqlContext, iris)

#creating a new column for category "Setosa"

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)

head(df)

output: new column created

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1  5.1 3.5  1.4 0.2  setosa
2  4.9 3.0  1.4 0.2  setosa
3  4.7 3.2  1.3 0.2  setosa
4  4.6 3.1  1.5 0.2  setosa
5  5.0 3.6  1.4 0.2  setosa
6  5.4 3.9  1.7 0.4  setosa

*but when I saved the iris dataset as a CSV file and try to read it and
convert it to sparkR dataframe*

df <- read.df(sqlContext,"/Users/devesh/Github/deveshgit2/bdaml/data/iris/",
  source = "com.databricks.spark.csv",header =
"true",inferSchema = "true")

now when I try to create new column

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)
I get the below error:

16/02/05 12:11:01 ERROR RBackendHandler: col on 922 failed
Error in select(x, x$"*", alias(col, colName)) :
  error in evaluating the argument 'col' in selecting a method for function
'select': Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...)
:
  org.apache.spark.sql.AnalysisException: Cannot resolve column name
"Sepal.Length" among (Sepal.Length, Sepal.Width, Petal.Length, Petal.Width,
Species);
at org.apache.spark.s
-- 
Warm regards,
Devesh.


Add Singapore meetup

2016-02-04 Thread Li Ming Tsai
Hi,


Realised that Singapore has not been added. Please add 
http://www.meetup.com/Spark-Singapore/



Thanks!



Unit test with sqlContext

2016-02-04 Thread Steve Annessa
I'm trying to unit test a function that reads in a JSON file, manipulates
the DF and then returns a Scala Map.

The function has signature:
def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext)

I've created a bootstrap spec for spark jobs that instantiates the Spark
Context and SQLContext like so:

@transient var sc: SparkContext = _
@transient var sqlContext: SQLContext = _

override def beforeAll = {
  System.clearProperty("spark.driver.port")
  System.clearProperty("spark.hostPort")

  val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)

  sc = new SparkContext(conf)
  sqlContext = new SQLContext(sc)
}

When I do not include sqlContext, my tests run. Once I add the sqlContext I
get the following errors:

16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being
constructed (or threw an exception in its constructor).  This may indicate
an error, since only one SparkContext may be running in this JVM (see
SPARK-2243). The other SparkContext was created at:
org.apache.spark.SparkContext.(SparkContext.scala:81)

16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is not
unique!

and finally:

[info] IngestSpec:
[info] Exception encountered when attempting to run a suite with class
name: com.company.package.IngestSpec *** ABORTED ***
[info]   akka.actor.InvalidActorNameException: actor name
[ExecutorEndpoint] is not unique!


What do I need to do to get a sqlContext through my tests?

Thanks,

-- Steve


RE: Kafka directsream receiving rate

2016-02-04 Thread Diwakar Dhanuskodi
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.

 Original message From: Diwakar Dhanuskodi 
 Date:05/02/2016  07:33  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Kafka 
directsream  receiving  rate 
Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.

Slowness in Kmeans calculating fastSquaredDistance

2016-02-04 Thread Li Ming Tsai
Hi,


I'm using INTEL MKL on Spark 1.6.0 which I built myself with the -Pnetlib-lgpl 
flag.


I am using spark local[4] mode and I run it like this:
# export LD_LIBRARY_PATH=/opt/intel/lib/intel64:/opt/intel/mkl/lib/intel64
# bin/spark-shell ...

I have also added the following to /opt/intel/mkl/lib/intel64:
lrwxrwxrwx 1 root root12 Feb  1 09:18 libblas.so -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 libblas.so.3 -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 liblapack.so -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 liblapack.so.3 -> libmkl_rt.so


I believe (???) that I'm using Intel MKL because the warnings went away:

16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeSystemBLAS

16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeRefBLAS

After collectAsMap, there is no progress but I can observe that only 1 CPU is 
being utilised with the following stack trace:

"ForkJoinPool-3-worker-7" #130 daemon prio=5 os_prio=0 tid=0x7fbf30ab6000 
nid=0xbdc runnable [0x7fbf12205000]

   java.lang.Thread.State: RUNNABLE

at com.github.fommil.netlib.F2jBLAS.ddot(F2jBLAS.java:71)

at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:128)

at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:111)

at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:349)

at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:587)

at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:561)

at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:555)


This last few steps takes more than half of the total time for a 1Mx100 dataset.


The code is just:

val clusters = KMeans.train(parsedData, 1000, 1)


Shouldn't it utilising all the cores for the dot product? Is this a 
misconfiguration?


Thanks!




Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Udo Fholl
Thank you for your response

Unfortunately I cannot share  a thread dump. What are you looking for
exactly?

Here is the list of the 50 biggest objects (retained size order,
descendent):

java.util.concurrent.ArrayBlockingQueue#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
scala.concurrent.forkjoin.ForkJoinPool#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.storage.MemoryStore#
java.util.LinkedHashMap#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
scala.collection.Iterator$$anon$
org.apache.spark.InterruptibleIterator#
scala.collection.IndexedSeqLike$Elements#
scala.collection.mutable.ArrayOps$ofRef#
java.lang.Object[]#



On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu 
wrote:

> Hey Udo,
>
> mapWithState usually uses much more memory than updateStateByKey since it
> caches the states in memory.
>
> However, from your description, looks BlockGenerator cannot push data into
> BlockManager, there may be something wrong in BlockGenerator. Could you
> share the top 50 objects in the heap dump and the thread dump?
>
>
> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:
>
>> Hi all,
>>
>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>> huge "Object[]").
>>
>> I'm pretty sure it has to do with my code, but I barely changed anything
>> in the code. Just adapted the function.
>>
>> Did anyone run into this?
>>
>> Best regards,
>> Udo.
>>
>
>


spark.storage.memoryFraction for shuffle-only jobs

2016-02-04 Thread Ruslan Dautkhanov
For a Spark job that only does shuffling
(e.g. Spark SQL with joins, group bys, analytical functions, order bys),
but no explicit persistent RDDs nor dataframes (there are no .cache()es in
the code),
what would be the lowest recommended setting
for spark.storage.memoryFraction?

spark.storage.memoryFraction defaults to 0.6 which is quite huge for
shuffle-only jobs.
spark.shuffle.memoryFraction defaults to 0.2 in Spark 1.5.0.

Can I set spark.storage.memoryFraction to something low like 0.1 or even
lower?
And spark.shuffle.memoryFraction to something large like 0.9? or perhaps
even 1.0?


Thanks!


rdd cache priority

2016-02-04 Thread charles li
say I have 2 RDDs, RDD1 and RDD2.

both are 20g in memory.

and I cache both of them in memory using RDD1.cache() and RDD2.cache()


the in the further steps on my app, I never use RDD1 but use RDD2 for lots
of time.


then here is my question:

if there is only 40G memory in my cluster, and here I have another RDD,
RDD3 for 20g, what happened if I cache RDD3 using RDD3.cache()?


as the document says, cache using the default cache level : MEMORY_ONLY .
it means that it will not definitely cache RDD3 but re-compute it every
time used.

I feel a little confused, will spark help me remove RDD1 and put RDD3 in
the memory?

or is there any concept like " Priority cache " in spark?


great thanks



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Driver not able to restart the job automatically after the application of Streaming with Kafka Direct went down

2016-02-04 Thread SRK
Hi,

I have the Streaming job running in qa/prod. Due to Kafka issues both the
jobs went down. After the Kafka issues got resolved and after the deletion
of the checkpoint directory the driver in the qa job restarted the job
automatically and the application UI was up. But, in the prod job, the
driver did not restart the application. Any idea as to why the prod driver
not able to restart the job with everything being same in qa/prod including
the --supervise option?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Driver-not-able-to-restart-the-job-automatically-after-the-application-of-Streaming-with-Kafka-Direcn-tp26155.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
I guess it may be some dead-lock in BlockGenerator. Could you check it by
yourself?

On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl  wrote:

> Thank you for your response
>
> Unfortunately I cannot share  a thread dump. What are you looking for
> exactly?
>
> Here is the list of the 50 biggest objects (retained size order,
> descendent):
>
> java.util.concurrent.ArrayBlockingQueue#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.concurrent.forkjoin.ForkJoinPool#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.storage.MemoryStore#
> java.util.LinkedHashMap#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.collection.Iterator$$anon$
> org.apache.spark.InterruptibleIterator#
> scala.collection.IndexedSeqLike$Elements#
> scala.collection.mutable.ArrayOps$ofRef#
> java.lang.Object[]#
>
>
>
>
> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Udo,
>>
>> mapWithState usually uses much more memory than updateStateByKey since it
>> caches the states in memory.
>>
>> However, from your description, looks BlockGenerator cannot push data
>> into BlockManager, there may be something wrong in BlockGenerator. Could
>> you share the top 50 objects in the heap dump and the thread dump?
>>
>>
>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:
>>
>>> Hi all,
>>>
>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>> huge "Object[]").
>>>
>>> I'm pretty sure it has to do with my code, but I barely changed anything
>>> in the code. Just adapted the function.
>>>
>>> Did anyone run into this?
>>>
>>> Best regards,
>>> Udo.
>>>
>>
>>
>


kafkaDirectStream usage error

2016-02-04 Thread Diwakar Dhanuskodi
I am  using  below  directsream to consume  messages from kafka . Topic has 8 
partitions.

 val topicAndPart =  OffsetRange.create("request5",0, 
1,10).topicAndPartition()
    val fromOffsets = Map[kafka.common.TopicAndPartition,Long](topicAndPart->0)
    val messageHandler = (mmd : MessageAndMetadata[String,String]) => 
(mmd.key(),mmd.message())
    val k1 = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
 kafkaParams, fromOffsets,messageHandler)

I am  getting  below  error . Any  idea  where  I am doing  wrong  . Please  
help .

6/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 
2, datanode3.isdp.com): UnknownReason
16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 
(TID 3, datanode3.isdp.com, RACK_LOCAL, 2273 bytes)
16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception 
could not be deserialized
java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at 
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
    at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/02/04 21:04:38 ERROR scheduler.TaskResultGetter: Could not deserialize 
TaskEndReason: ClassNotFound with classloader 
org.apache.spark.util.MutableURLClassLoader@7202dc8c
16/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 
(TID 3, datanode3.isdp.com): UnknownReason
16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 
(TID 4, datanode3.isdp.com, RACK_LOCAL, 2273 bytes)
16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception 
could not be deserialized
java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
    at 

PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval.Itzchakov
Hi,
I've been playing with the expiramental PairDStreamFunctions.mapWithState
feature and I've seem to have stumbled across a bug, and was wondering if
anyone else has been seeing this behavior.

I've opened up an issue in the Spark JIRA, I simply want to pass this along
in case anyone else is experiencing such a failure or perhaps someone has
insightful information if this is actually a bug:  SPARK-13195
  

Using the new spark mapWithState API, I've encountered a bug when setting a
timeout for mapWithState but no explicit state handling.

h1. Steps to reproduce:

1. Create a method which conforms to the StateSpec signature, make sure to
not update any state inside it using *state.update*. Simply create a "pass
through" method, may even be empty.
2. Create a StateSpec object with method from step 1, which explicitly sets
a timeout using *StateSpec.timeout* method.
3. Create a DStream pipeline that uses mapWithState with the given
StateSpec.
4. Run code using spark-submit. You'll see that the method ends up throwing
the following exception:

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
at org.apache.spark.streaming.StateImpl.get(State.scala:150)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

h1. Sample code to reproduce the issue:

{code:Title=MainObject}
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by yuvali on 04/02/2016.
  */
object Program {

  def main(args: Array[String]): Unit = {

val sc = new SparkConf().setAppName("mapWithState bug reproduce")
val sparkContext = new SparkContext(sc)

val ssc = new StreamingContext(sparkContext, Seconds(4))
val stateSpec = StateSpec.function(trackStateFunc
_).timeout(Seconds(60))

// Create a stream that generates 1000 lines per second
val stream = ssc.receiverStream(new DummySource(10))

// Split the lines into words, and create a paired (key-value) dstream
val wordStream = stream.flatMap {
  _.split(" ")
}.map(word => (word, 1))

// This represents the emitted stream from the trackStateFunc. Since we
emit every input record with the updated value,
// this stream will contain the same # of records as the input dstream.
val wordCountStateStream = wordStream.mapWithState(stateSpec)
wordCountStateStream.print()

ssc.remember(Minutes(1)) // To make sure data is not deleted by the time
we query it interactively

// Don't forget to set checkpoint directory
ssc.checkpoint("")
ssc.start()
ssc.awaitTermination()
  }

  def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
Some(output)
  }
}
{code}

{code:Title=DummySource}

/**
  * Created by yuvali on 04/02/2016.
  */

import org.apache.spark.storage.StorageLevel
import scala.util.Random
import org.apache.spark.streaming.receiver._

class DummySource(ratePerSec: Int) extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
// Start the thread that receives data over a connection
new Thread("Dummy Source") {
  override def run() { receive() }
}.start()
  }

  def onStop() {
// There is nothing much to do 

spark.executor.memory ? is used just for cache RDD or both cache RDD and the runtime of cores on worker?

2016-02-04 Thread charles li
if set spark.executor.memory = 2G for each worker [ 10 in total ]

does it mean I can cache 20G RDD in memory ? if so, how about the memory
for code running in each process on each worker?

thanks.


--
and is there any materials about memory management or resource management
in spark ? I want to put spark in production, but have little knowing about
the resource management in spark, great thanks again


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: SQL Statement on DataFrame

2016-02-04 Thread Nishant Aggarwal
Hi Ted,

I am using Spark-Shell to do this.
I am using Phoenix's client jar for integrating Spark with HBASE. All the
operations will be done on Spark side.

Thanks,
Nishant


Thanks and Regards
Nishant Aggarwal, PMP
Cell No:- +91 99588 94305
http://in.linkedin.com/pub/nishant-aggarwal/53/698/11b


On Fri, Feb 5, 2016 at 9:04 AM, Ted Yu  wrote:

> Did you mean using bin/sqlline.py to perform the query ?
>
> Have you asked on Phoenix mailing list ?
>
> Phoenix has phoenix-spark module.
>
> Cheers
>
> On Thu, Feb 4, 2016 at 7:28 PM, Nishant Aggarwal 
> wrote:
>
>> Dear All,
>>
>> I am working on a scenario mentioned below. Need your help:
>>
>> Task:
>> Load the data from HBASE using Phoenix into Spark as a DataFrame, do the
>> operation and store the data back to HBASE using Phoenix. I know this is
>> feasible via writing code.
>>
>> My question is, Is it possible to load the HBASE table using Phoenix into
>> a DataFrame and perform SQL queries on top of it(Instead of writing code)
>> and store the result back to HBASE ?
>>
>> Any help on this will be highly appreciated.
>>
>>
>>
>> Thanks and Regards
>> Nishant Aggarwal, PMP
>> Cell No:- +91 99588 94305
>> http://in.linkedin.com/pub/nishant-aggarwal/53/698/11b
>>
>>
>


Re: Unit test with sqlContext

2016-02-04 Thread Silvio Fiorito
Hi Steve,

Have you looked at the spark-testing-base package by Holden? It’s really useful 
for unit testing Spark apps as it handles all the bootstrapping for you.

https://github.com/holdenk/spark-testing-base

DataFrame examples are here: 
https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala

Thanks,
Silvio

From: Steve Annessa >
Date: Thursday, February 4, 2016 at 8:36 PM
To: "user@spark.apache.org" 
>
Subject: Unit test with sqlContext

I'm trying to unit test a function that reads in a JSON file, manipulates the 
DF and then returns a Scala Map.

The function has signature:
def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext)

I've created a bootstrap spec for spark jobs that instantiates the Spark 
Context and SQLContext like so:

@transient var sc: SparkContext = _
@transient var sqlContext: SQLContext = _

override def beforeAll = {
  System.clearProperty("spark.driver.port")
  System.clearProperty("spark.hostPort")

  val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)

  sc = new SparkContext(conf)
  sqlContext = new SQLContext(sc)
}

When I do not include sqlContext, my tests run. Once I add the sqlContext I get 
the following errors:

16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being constructed 
(or threw an exception in its constructor).  This may indicate an error, since 
only one SparkContext may be running in this JVM (see SPARK-2243). The other 
SparkContext was created at:
org.apache.spark.SparkContext.(SparkContext.scala:81)

16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is not 
unique!

and finally:

[info] IngestSpec:
[info] Exception encountered when attempting to run a suite with class name: 
com.company.package.IngestSpec *** ABORTED ***
[info]   akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is 
not unique!


What do I need to do to get a sqlContext through my tests?

Thanks,

-- Steve


Re: rdd cache priority

2016-02-04 Thread Takeshi Yamamuro
Hi,

u're right; rdd3 is not totally cached and it is re-computed every time.
If MEMORY_AND_DISK, rdd3 is written to disk.

Also, the current Spark does not automatically unpersist rdds depends
on frequency of use.

On Fri, Feb 5, 2016 at 12:15 PM, charles li  wrote:

> say I have 2 RDDs, RDD1 and RDD2.
>
> both are 20g in memory.
>
> and I cache both of them in memory using RDD1.cache() and RDD2.cache()
>
>
> the in the further steps on my app, I never use RDD1 but use RDD2 for lots
> of time.
>
>
> then here is my question:
>
> if there is only 40G memory in my cluster, and here I have another RDD,
> RDD3 for 20g, what happened if I cache RDD3 using RDD3.cache()?
>
>
> as the document says, cache using the default cache level : MEMORY_ONLY .
> it means that it will not definitely cache RDD3 but re-compute it every
> time used.
>
> I feel a little confused, will spark help me remove RDD1 and put RDD3 in
> the memory?
>
> or is there any concept like " Priority cache " in spark?
>
>
> great thanks
>
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>



-- 
---
Takeshi Yamamuro


Re: spark.executor.memory ? is used just for cache RDD or both cache RDD and the runtime of cores on worker?

2016-02-04 Thread Rishi Mishra
You would probably like to see
http://spark.apache.org/docs/latest/configuration.html#memory-management.
Other config parameters are also explained there.

On Fri, Feb 5, 2016 at 10:56 AM, charles li  wrote:

> if set spark.executor.memory = 2G for each worker [ 10 in total ]
>
> does it mean I can cache 20G RDD in memory ? if so, how about the memory
> for code running in each process on each worker?
>
> thanks.
>
>
> --
> and is there any materials about memory management or resource management
> in spark ? I want to put spark in production, but have little knowing about
> the resource management in spark, great thanks again
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


SQL Statement on DataFrame

2016-02-04 Thread Nishant Aggarwal
Dear All,

I am working on a scenario mentioned below. Need your help:

Task:
Load the data from HBASE using Phoenix into Spark as a DataFrame, do the
operation and store the data back to HBASE using Phoenix. I know this is
feasible via writing code.

My question is, Is it possible to load the HBASE table using Phoenix into a
DataFrame and perform SQL queries on top of it(Instead of writing code) and
store the result back to HBASE ?

Any help on this will be highly appreciated.



Thanks and Regards
Nishant Aggarwal, PMP
Cell No:- +91 99588 94305
http://in.linkedin.com/pub/nishant-aggarwal/53/698/11b


Re: Please help with external package using --packages option in spark-shell

2016-02-04 Thread Jeff - Data Bean Australia
Thanks Divya for helping me.

It does connect to the internet.

And I even tried to use a local artifactory repository and it didn't work
either.

$ ./spark-shell --packages harsha2010:magellan:1.0.3-s_2.10 --repositories
http://localhost:8081/artifactory/libs-release
Ivy Default Cache set to: /Users/jeffhuang/.ivy2/cache
The jars for the packages stored in: /Users/jeffhuang/.ivy2/jars
http://localhost:8081/artifactory/libs-release added as a remote repository
with the name: repo-1
:: loading settings :: url =
jar:file:/Users/jeffhuang/learning/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
harsha2010#magellan added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found harsha2010#magellan;1.0.3-s_2.10 in spark-packages
found commons-io#commons-io;2.4 in local-m2-cache
found com.esri.geometry#esri-geometry-api;1.2.1 in central
found org.json#json;20090211 in local-m2-cache
found org.codehaus.jackson#jackson-core-asl;1.9.12 in local-m2-cache
:: resolution report :: resolve 282ms :: artifacts dl 10ms
:: modules in use:
com.esri.geometry#esri-geometry-api;1.2.1 from central in [default]
commons-io#commons-io;2.4 from local-m2-cache in [default]
harsha2010#magellan;1.0.3-s_2.10 from spark-packages in [default]
org.codehaus.jackson#jackson-core-asl;1.9.12 from local-m2-cache in
[default]
org.json#json;20090211 from local-m2-cache in [default]

On Fri, Feb 5, 2016 at 5:01 PM, Divya Gehlot 
wrote:

> Is your machine connected to internet ?
>
>
> On 5 February 2016 at 12:00, Jeff - Data Bean Australia <
> databean...@gmail.com> wrote:
>
>> Hi,
>>
>> Trying to use an external package with the following command line and it
>> doesn't work:
>>
>> ./spark-shell --packages harsha2010:magellan:1.0.3-s_2.10
>>
>> Please find the package info here:
>> http://spark-packages.org/package/harsha2010/magellan
>>
>> The error related to one of the dependencies was not found successfully:
>>
>> [NOT FOUND  ]
>> org.codehaus.jackson#jackson-core-asl;1.9.12!jackson-core-asl.jar (1ms)
>>
>> And the weird part is that Spark or Ivy didn't actually try to look it up
>> online, but simply reported error when it didn't find the package in my
>> local m2 repository. Please see the complete error message below and help
>> me out.
>>
>> MacBook-Pro-2:bin jeffhuang$ ./spark-shell --packages
>> harsha2010:magellan:1.0.3-s_2.10
>> Ivy Default Cache set to: /Users/jeffhuang/.ivy2/cache
>> The jars for the packages stored in: /Users/jeffhuang/.ivy2/jars
>> :: loading settings :: url =
>> jar:file:/Users/jeffhuang/learning/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
>> harsha2010#magellan added as a dependency
>> :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
>> confs: [default]
>> found harsha2010#magellan;1.0.3-s_2.10 in spark-packages
>> found commons-io#commons-io;2.4 in local-m2-cache
>> found com.esri.geometry#esri-geometry-api;1.2.1 in central
>> found org.json#json;20090211 in local-m2-cache
>> found org.codehaus.jackson#jackson-core-asl;1.9.12 in local-m2-cache
>> :: resolution report :: resolve 282ms :: artifacts dl 9ms
>> :: modules in use:
>> com.esri.geometry#esri-geometry-api;1.2.1 from central in [default]
>> commons-io#commons-io;2.4 from local-m2-cache in [default]
>> harsha2010#magellan;1.0.3-s_2.10 from spark-packages in [default]
>> org.codehaus.jackson#jackson-core-asl;1.9.12 from local-m2-cache in
>> [default]
>> org.json#json;20090211 from local-m2-cache in [default]
>> -
>> |  |modules||   artifacts   |
>> |   conf   | number| search|dwnlded|evicted|| number|dwnlded|
>> -
>> |  default |   5   |   0   |   0   |   0   ||   5   |   0   |
>> -
>>
>> :: problems summary ::
>>  WARNINGS
>> [NOT FOUND  ]
>> org.codehaus.jackson#jackson-core-asl;1.9.12!jackson-core-asl.jar (1ms)
>>
>>  local-m2-cache: tried
>>
>>
>>  
>> file:/Users/jeffhuang/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.12/jackson-core-asl-1.9.12.jar
>>
>> ::
>>
>> ::  FAILED DOWNLOADS::
>>
>> :: ^ see resolution messages for details  ^ ::
>>
>> ::
>>
>> :: org.codehaus.jackson#jackson-core-asl;1.9.12!jackson-core-asl.jar
>>
>> ::
>>
>>
>>
>> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
>> Exception in thread "main" java.lang.RuntimeException: [download failed:
>> org.codehaus.jackson#jackson-core-asl;1.9.12!jackson-core-asl.jar]
>> at
>> 

Re: sc.textFile the number of the workers to parallelize

2016-02-04 Thread Takeshi Yamamuro
Hi,

ISTM these tasks are just assigned with executors in preferred nodes, so
how about repartitioning rdd?

s3File.repartition(9).count

On Fri, Feb 5, 2016 at 5:04 AM, Lin, Hao  wrote:

> Hi,
>
>
>
> I have a question on the number of workers that Spark enable to
> parallelize the loading of files using sc.textFile. When I used sc.textFile
> to access multiple files in AWS S3, it seems to only enable 2 workers
> regardless of how many worker nodes I have in my cluster. So how does Spark
> configure the parallelization in regard of the size of cluster nodes? In
> the following case, spark has 896 tasks split between only two nodes
> 10.162.97.235 and 10.162.97.237, while I have 9 nodes in the cluster.
>
>
>
> thanks
>
>
>
> Example of doing a count:
>
>  scala> s3File.count
>
> 16/02/04 18:12:06 INFO SparkContext: Starting job: count at :30
>
> 16/02/04 18:12:06 INFO DAGScheduler: Got job 0 (count at :30)
> with 896 output partitions
>
> 16/02/04 18:12:06 INFO DAGScheduler: Final stage: ResultStage 0 (count at
> :30)
>
> 16/02/04 18:12:06 INFO DAGScheduler: Parents of final stage: List()
>
> 16/02/04 18:12:06 INFO DAGScheduler: Missing parents: List()
>
> 16/02/04 18:12:06 INFO DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[1] at textFile at :27), which has no missing
> parents
>
> 16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 3.0 KB, free 228.3 KB)
>
> 16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 1834.0 B, free 230.1 KB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.162.98.112:46425 (size: 1834.0 B, free: 517.4 MB)
>
> 16/02/04 18:12:07 INFO SparkContext: Created broadcast 1 from broadcast at
> DAGScheduler.scala:1006
>
> 16/02/04 18:12:07 INFO DAGScheduler: Submitting 896 missing tasks from
> ResultStage 0 (MapPartitionsRDD[1] at textFile at :27)
>
> 16/02/04 18:12:07 INFO YarnScheduler: Adding task set 0.0 with 896 tasks
>
> 16/02/04 18:12:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, 10.162.97.235, partition 0,RACK_LOCAL, 2213 bytes)
>
> 16/02/04 18:12:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, 10.162.97.237, partition 1,RACK_LOCAL, 2213 bytes)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.162.97.235:38643 (size: 1834.0 B, free: 1259.8 MB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.162.97.237:45360 (size: 1834.0 B, free: 1259.8 MB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on 10.162.97.237:45360 (size: 23.8 KB, free: 1259.8 MB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on 10.162.97.235:38643 (size: 23.8 KB, free: 1259.8 MB)
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>



-- 
---
Takeshi Yamamuro


Re: sc.textFile the number of the workers to parallelize

2016-02-04 Thread Koert Kuipers
increase minPartitions:
sc.textFile(path, minPartitions = 9)


On Thu, Feb 4, 2016 at 11:41 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> ISTM these tasks are just assigned with executors in preferred nodes, so
> how about repartitioning rdd?
>
> s3File.repartition(9).count
>
> On Fri, Feb 5, 2016 at 5:04 AM, Lin, Hao  wrote:
>
>> Hi,
>>
>>
>>
>> I have a question on the number of workers that Spark enable to
>> parallelize the loading of files using sc.textFile. When I used sc.textFile
>> to access multiple files in AWS S3, it seems to only enable 2 workers
>> regardless of how many worker nodes I have in my cluster. So how does Spark
>> configure the parallelization in regard of the size of cluster nodes? In
>> the following case, spark has 896 tasks split between only two nodes
>> 10.162.97.235 and 10.162.97.237, while I have 9 nodes in the cluster.
>>
>>
>>
>> thanks
>>
>>
>>
>> Example of doing a count:
>>
>>  scala> s3File.count
>>
>> 16/02/04 18:12:06 INFO SparkContext: Starting job: count at :30
>>
>> 16/02/04 18:12:06 INFO DAGScheduler: Got job 0 (count at :30)
>> with 896 output partitions
>>
>> 16/02/04 18:12:06 INFO DAGScheduler: Final stage: ResultStage 0 (count at
>> :30)
>>
>> 16/02/04 18:12:06 INFO DAGScheduler: Parents of final stage: List()
>>
>> 16/02/04 18:12:06 INFO DAGScheduler: Missing parents: List()
>>
>> 16/02/04 18:12:06 INFO DAGScheduler: Submitting ResultStage 0
>> (MapPartitionsRDD[1] at textFile at :27), which has no missing
>> parents
>>
>> 16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1 stored as values in
>> memory (estimated size 3.0 KB, free 228.3 KB)
>>
>> 16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1_piece0 stored as
>> bytes in memory (estimated size 1834.0 B, free 230.1 KB)
>>
>> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.162.98.112:46425 (size: 1834.0 B, free: 517.4 MB)
>>
>> 16/02/04 18:12:07 INFO SparkContext: Created broadcast 1 from broadcast
>> at DAGScheduler.scala:1006
>>
>> 16/02/04 18:12:07 INFO DAGScheduler: Submitting 896 missing tasks from
>> ResultStage 0 (MapPartitionsRDD[1] at textFile at :27)
>>
>> 16/02/04 18:12:07 INFO YarnScheduler: Adding task set 0.0 with 896 tasks
>>
>> 16/02/04 18:12:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0
>> (TID 0, 10.162.97.235, partition 0,RACK_LOCAL, 2213 bytes)
>>
>> 16/02/04 18:12:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0
>> (TID 1, 10.162.97.237, partition 1,RACK_LOCAL, 2213 bytes)
>>
>> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.162.97.235:38643 (size: 1834.0 B, free: 1259.8 MB)
>>
>> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.162.97.237:45360 (size: 1834.0 B, free: 1259.8 MB)
>>
>> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> memory on 10.162.97.237:45360 (size: 23.8 KB, free: 1259.8 MB)
>>
>> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> memory on 10.162.97.235:38643 (size: 23.8 KB, free: 1259.8 MB)
>> Confidentiality Notice:: This email, including attachments, may include
>> non-public, proprietary, confidential or legally privileged information. If
>> you are not an intended recipient or an authorized agent of an intended
>> recipient, you are hereby notified that any dissemination, distribution or
>> copying of the information contained in or transmitted with this e-mail is
>> unauthorized and strictly prohibited. If you have received this email in
>> error, please notify the sender by replying to this message and permanently
>> delete this e-mail, its attachments, and any copies of it immediately. You
>> should not retain, copy or use this e-mail or any attachment for any
>> purpose, nor disclose all or any part of the contents to any other person.
>> Thank you.
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Unit test with sqlContext

2016-02-04 Thread Rishi Mishra
Hi Steve,
Have you cleaned up your SparkContext ( sc.stop())  , in a afterAll(). The
error suggests you are creating more than one SparkContext.


On Fri, Feb 5, 2016 at 10:04 AM, Holden Karau  wrote:

> Thanks for recommending spark-testing-base :) Just wanted to add if anyone
> has feature requests for Spark testing please get in touch (or add an issue
> on the github) :)
>
>
> On Thu, Feb 4, 2016 at 8:25 PM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Hi Steve,
>>
>> Have you looked at the spark-testing-base package by Holden? It’s really
>> useful for unit testing Spark apps as it handles all the bootstrapping for
>> you.
>>
>> https://github.com/holdenk/spark-testing-base
>>
>> DataFrame examples are here:
>> https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala
>>
>> Thanks,
>> Silvio
>>
>> From: Steve Annessa 
>> Date: Thursday, February 4, 2016 at 8:36 PM
>> To: "user@spark.apache.org" 
>> Subject: Unit test with sqlContext
>>
>> I'm trying to unit test a function that reads in a JSON file, manipulates
>> the DF and then returns a Scala Map.
>>
>> The function has signature:
>> def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext)
>>
>> I've created a bootstrap spec for spark jobs that instantiates the Spark
>> Context and SQLContext like so:
>>
>> @transient var sc: SparkContext = _
>> @transient var sqlContext: SQLContext = _
>>
>> override def beforeAll = {
>>   System.clearProperty("spark.driver.port")
>>   System.clearProperty("spark.hostPort")
>>
>>   val conf = new SparkConf()
>> .setMaster(master)
>> .setAppName(appName)
>>
>>   sc = new SparkContext(conf)
>>   sqlContext = new SQLContext(sc)
>> }
>>
>> When I do not include sqlContext, my tests run. Once I add the sqlContext
>> I get the following errors:
>>
>> 16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being
>> constructed (or threw an exception in its constructor).  This may indicate
>> an error, since only one SparkContext may be running in this JVM (see
>> SPARK-2243). The other SparkContext was created at:
>> org.apache.spark.SparkContext.(SparkContext.scala:81)
>>
>> 16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
>> akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is
>> not unique!
>>
>> and finally:
>>
>> [info] IngestSpec:
>> [info] Exception encountered when attempting to run a suite with class
>> name: com.company.package.IngestSpec *** ABORTED ***
>> [info]   akka.actor.InvalidActorNameException: actor name
>> [ExecutorEndpoint] is not unique!
>>
>>
>> What do I need to do to get a sqlContext through my tests?
>>
>> Thanks,
>>
>> -- Steve
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Recommended storage solution for my setup (~5M items, 10KB pr.)

2016-02-04 Thread Nick Pentreath
If I'm not mistaken, your data seems to be about 50MB of text documents? In 
which case simple flat text files in JSON or CSV seems ideal, as you are 
already doing. If you are using Spark then DataFrames can read/write either of 
these formats.

For that size of data you may not require Spark. Single-instance scikit-learn 
or VW or whatever should be ok (depending on what model you want to build).

If you need any search & filtering capability I'd recommend elasticsearch, 
which has a very good Spark connecter within the elasticsearch-hadoop project. 
It's also easy to set up and get started (but more tricky to actually run in 
production).

PostgresSQL may also be a good option with its JSON support.

Hope that helps

Sent from my iPhone

> On 4 Feb 2016, at 23:23, Patrick Skjennum  wrote:
> 
> (Am I doing this mailinglist thing right? Never used this ...)
> 
> I do not have a cluster.
> 
> Initially I tried to setup hadoop+hbase+spark, but after spending a week 
> trying to get work, I gave up. I had a million problems with mismatching 
> versions, and things working locally on the server, but not programatically 
> through my client computer, and vice versa. There was always something  that 
> did not work, one way another.
> 
> And since I had to actually get things done rather than becoming an expert in 
> clustering, I gave up and just used simple serializing.
> 
> Now I'm going to make a second attempt, but this time around I'll ask for 
> help:p
> -- 
> mvh
> Patrick Skjennum
> 
> 
>> On 04.02.2016 22.14, Ted Yu wrote:
>> bq. had a hard time setting it up
>> 
>> Mind sharing your experience in more detail :-)
>> If you already have a hadoop cluster, it should be relatively straight 
>> forward to setup.
>> 
>> Tuning needs extra effort.
>> 
>>> On Thu, Feb 4, 2016 at 12:58 PM, habitats  wrote:
>>> Hello
>>> 
>>> I have ~5 million text documents, each around 10-15KB in size, and split
>>> into ~15 columns. I intend to do machine learning, and thus I need to
>>> extract all of the data at the same time, and potentially update everything
>>> on every run.
>>> 
>>> So far I've just used json serializing, or simply cached the RDD to dick.
>>> However, I feel like there must be a better way.
>>> 
>>> I have tried HBase, but I had a hard time setting it up and getting it to
>>> work properly. It also felt like a lot of work for my simple requirements. I
>>> want something /simple/.
>>> 
>>> Any suggestions?
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-storage-solution-for-my-setup-5M-items-10KB-pr-tp26150.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: SQL Statement on DataFrame

2016-02-04 Thread Ted Yu
Did you mean using bin/sqlline.py to perform the query ?

Have you asked on Phoenix mailing list ?

Phoenix has phoenix-spark module.

Cheers

On Thu, Feb 4, 2016 at 7:28 PM, Nishant Aggarwal 
wrote:

> Dear All,
>
> I am working on a scenario mentioned below. Need your help:
>
> Task:
> Load the data from HBASE using Phoenix into Spark as a DataFrame, do the
> operation and store the data back to HBASE using Phoenix. I know this is
> feasible via writing code.
>
> My question is, Is it possible to load the HBASE table using Phoenix into
> a DataFrame and perform SQL queries on top of it(Instead of writing code)
> and store the result back to HBASE ?
>
> Any help on this will be highly appreciated.
>
>
>
> Thanks and Regards
> Nishant Aggarwal, PMP
> Cell No:- +91 99588 94305
> http://in.linkedin.com/pub/nishant-aggarwal/53/698/11b
>
>


Re: Unit test with sqlContext

2016-02-04 Thread Holden Karau
Thanks for recommending spark-testing-base :) Just wanted to add if anyone
has feature requests for Spark testing please get in touch (or add an issue
on the github) :)


On Thu, Feb 4, 2016 at 8:25 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Steve,
>
> Have you looked at the spark-testing-base package by Holden? It’s really
> useful for unit testing Spark apps as it handles all the bootstrapping for
> you.
>
> https://github.com/holdenk/spark-testing-base
>
> DataFrame examples are here:
> https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala
>
> Thanks,
> Silvio
>
> From: Steve Annessa 
> Date: Thursday, February 4, 2016 at 8:36 PM
> To: "user@spark.apache.org" 
> Subject: Unit test with sqlContext
>
> I'm trying to unit test a function that reads in a JSON file, manipulates
> the DF and then returns a Scala Map.
>
> The function has signature:
> def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext)
>
> I've created a bootstrap spec for spark jobs that instantiates the Spark
> Context and SQLContext like so:
>
> @transient var sc: SparkContext = _
> @transient var sqlContext: SQLContext = _
>
> override def beforeAll = {
>   System.clearProperty("spark.driver.port")
>   System.clearProperty("spark.hostPort")
>
>   val conf = new SparkConf()
> .setMaster(master)
> .setAppName(appName)
>
>   sc = new SparkContext(conf)
>   sqlContext = new SQLContext(sc)
> }
>
> When I do not include sqlContext, my tests run. Once I add the sqlContext
> I get the following errors:
>
> 16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being
> constructed (or threw an exception in its constructor).  This may indicate
> an error, since only one SparkContext may be running in this JVM (see
> SPARK-2243). The other SparkContext was created at:
> org.apache.spark.SparkContext.(SparkContext.scala:81)
>
> 16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
> akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is not
> unique!
>
> and finally:
>
> [info] IngestSpec:
> [info] Exception encountered when attempting to run a suite with class
> name: com.company.package.IngestSpec *** ABORTED ***
> [info]   akka.actor.InvalidActorNameException: actor name
> [ExecutorEndpoint] is not unique!
>
>
> What do I need to do to get a sqlContext through my tests?
>
> Thanks,
>
> -- Steve
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Need to user univariate summary stats

2016-02-04 Thread Arunkumar Pillai
Hi

I'm currently using query

sqlContext.sql("SELECT MAX(variablesArray) FROM " + tableName)

to extract mean max min.
is there any better  optimized way ?

In the example i saw df.groupBy("key").agg(skewness("a"), kurtosis("a"))


But i don't have key anywhere in the data.

How to extract the univariate summary stats from df. please help

-- 
Thanks and Regards
Arun


add new column in the schema + Dataframe

2016-02-04 Thread Divya Gehlot
Hi,
I am beginner in spark and using Spark 1.5.2 on YARN.(HDP2.3.4)
I have a use case where I have to read two input files and based on certain
 conditions in second input file ,have to add a new column in the first
input file and save it .

I am using spark-csv to read my input files .
Would really appreciate if somebody would share their thoughts on
best/feasible way of doing it(using dataframe API)


Thanks,
Divya


[Spark 1.6] Univariate Stats using apache spark

2016-02-04 Thread Arunkumar Pillai
Hi


Currently after creating a dataframe i'm queryingmax max min mean  it to
get result.
sqlContext.sql("SELECT MAX(variablesArray) FROM " + tableName)


Is this an optimized way?
I'm not able to find the all stats like min max mean variance skewness
kurtosis directly from a dataframe

Please help



-- 
Thanks and Regards
Arun


Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval Itzchakov
Let me know if you do need a pull request for this, I can make that happen
(given someone does a vast PR to make sure I'm understanding this problem
right).

On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu 
wrote:

> Thanks for reporting it. I will take a look.
>
> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov  wrote:
>
>> Hi,
>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>> feature and I've seem to have stumbled across a bug, and was wondering if
>> anyone else has been seeing this behavior.
>>
>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>> along
>> in case anyone else is experiencing such a failure or perhaps someone has
>> insightful information if this is actually a bug:  SPARK-13195
>> 
>>
>> Using the new spark mapWithState API, I've encountered a bug when setting
>> a
>> timeout for mapWithState but no explicit state handling.
>>
>> h1. Steps to reproduce:
>>
>> 1. Create a method which conforms to the StateSpec signature, make sure to
>> not update any state inside it using *state.update*. Simply create a "pass
>> through" method, may even be empty.
>> 2. Create a StateSpec object with method from step 1, which explicitly
>> sets
>> a timeout using *StateSpec.timeout* method.
>> 3. Create a DStream pipeline that uses mapWithState with the given
>> StateSpec.
>> 4. Run code using spark-submit. You'll see that the method ends up
>> throwing
>> the following exception:
>>
>> {code}
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in
>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>>
>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> {code}
>>
>> h1. Sample code to reproduce the issue:
>>
>> {code:Title=MainObject}
>> import org.apache.spark.streaming._
>> import org.apache.spark.{SparkConf, SparkContext}
>> /**
>>   * Created by yuvali on 04/02/2016.
>>   */
>> object Program {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>> val sparkContext = new SparkContext(sc)
>>
>> val ssc = new StreamingContext(sparkContext, Seconds(4))
>> val stateSpec = StateSpec.function(trackStateFunc
>> _).timeout(Seconds(60))
>>
>> // Create a stream that generates 1000 lines per second
>> val stream = ssc.receiverStream(new DummySource(10))
>>
>> // Split the lines into words, and create a paired (key-value) dstream
>> val wordStream = stream.flatMap {
>>   _.split(" ")
>> }.map(word => (word, 1))
>>
>> // This represents the emitted stream from the trackStateFunc. Since
>> we
>> emit every input record with the updated value,
>> // this stream will contain the same # of records as the input
>> dstream.
>> val wordCountStateStream = wordStream.mapWithState(stateSpec)
>> wordCountStateStream.print()
>>
>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>> time
>> we query it interactively
>>
>> // Don't forget to set checkpoint directory
>> ssc.checkpoint("")
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>>
>>   def trackStateFunc(batchTime: Time, key: String, 

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
Hey Udo,

mapWithState usually uses much more memory than updateStateByKey since it
caches the states in memory.

However, from your description, looks BlockGenerator cannot push data into
BlockManager, there may be something wrong in BlockGenerator. Could you
share the top 50 objects in the heap dump and the thread dump?


On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:

> Hi all,
>
> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
> see a huge increase of memory. Most of it is a massive "BlockGenerator"
> (which points to a massive "ArrayBlockingQueue" that in turns point to a
> huge "Object[]").
>
> I'm pretty sure it has to do with my code, but I barely changed anything
> in the code. Just adapted the function.
>
> Did anyone run into this?
>
> Best regards,
> Udo.
>


Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Tathagata Das
Shixiong has already opened the PR -
https://github.com/apache/spark/pull/11081

On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov  wrote:

> Let me know if you do need a pull request for this, I can make that happen
> (given someone does a vast PR to make sure I'm understanding this problem
> right).
>
> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Thanks for reporting it. I will take a look.
>>
>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
>> wrote:
>>
>>> Hi,
>>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>>> feature and I've seem to have stumbled across a bug, and was wondering if
>>> anyone else has been seeing this behavior.
>>>
>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>> along
>>> in case anyone else is experiencing such a failure or perhaps someone has
>>> insightful information if this is actually a bug:  SPARK-13195
>>> 
>>>
>>> Using the new spark mapWithState API, I've encountered a bug when
>>> setting a
>>> timeout for mapWithState but no explicit state handling.
>>>
>>> h1. Steps to reproduce:
>>>
>>> 1. Create a method which conforms to the StateSpec signature, make sure
>>> to
>>> not update any state inside it using *state.update*. Simply create a
>>> "pass
>>> through" method, may even be empty.
>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>> sets
>>> a timeout using *StateSpec.timeout* method.
>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>> StateSpec.
>>> 4. Run code using spark-submit. You'll see that the method ends up
>>> throwing
>>> the following exception:
>>>
>>> {code}
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in
>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at
>>>
>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> {code}
>>>
>>> h1. Sample code to reproduce the issue:
>>>
>>> {code:Title=MainObject}
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> /**
>>>   * Created by yuvali on 04/02/2016.
>>>   */
>>> object Program {
>>>
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>> val sparkContext = new SparkContext(sc)
>>>
>>> val ssc = new StreamingContext(sparkContext, Seconds(4))
>>> val stateSpec = StateSpec.function(trackStateFunc
>>> _).timeout(Seconds(60))
>>>
>>> // Create a stream that generates 1000 lines per second
>>> val stream = ssc.receiverStream(new DummySource(10))
>>>
>>> // Split the lines into words, and create a paired (key-value)
>>> dstream
>>> val wordStream = stream.flatMap {
>>>   _.split(" ")
>>> }.map(word => (word, 1))
>>>
>>> // This represents the emitted stream from the trackStateFunc. Since
>>> we
>>> emit every input record with the updated value,
>>> // this stream will contain the same # of records as the input
>>> dstream.
>>> val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>> wordCountStateStream.print()
>>>

Re: Reading large set of files in Spark

2016-02-04 Thread Ted Yu
For question #2, see the following method of FileSystem :

  public abstract boolean delete(Path f, boolean recursive) throws
IOException;

FYI

On Thu, Feb 4, 2016 at 10:58 AM, Akhilesh Pathodia <
pathodia.akhil...@gmail.com> wrote:

> Hi,
>
> I am using Spark to read large set of files from HDFS, applying some
> formatting on each line and then saving each line as a record in hive.
> Spark is reading directory paths from kafka. Each directory can have large
> number of files. I am reading one path from kafka and then processing all
> files of the directory in parallel. I have delete the directory after all
> files are processed I have following questions:
>
> 1. What is the optimized way to read large set of files in Spark? I am not
> using sc.textFile(), instead I am reading the file content using FileSystem
> and creating Dstream of lines.
> 2. How to delete the directory/files from HDFS after the task is completed?
>
> Thanks,
> Akhilesh
>


Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval Itzchakov
Awesome. Thanks for the super fast reply.

On Thu, Feb 4, 2016, 21:16 Tathagata Das 
wrote:

> Shixiong has already opened the PR -
> https://github.com/apache/spark/pull/11081
>
> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
> wrote:
>
>> Let me know if you do need a pull request for this, I can make that
>> happen (given someone does a vast PR to make sure I'm understanding this
>> problem right).
>>
>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Thanks for reporting it. I will take a look.
>>>
>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
>>> wrote:
>>>
 Hi,
 I've been playing with the expiramental
 PairDStreamFunctions.mapWithState
 feature and I've seem to have stumbled across a bug, and was wondering
 if
 anyone else has been seeing this behavior.

 I've opened up an issue in the Spark JIRA, I simply want to pass this
 along
 in case anyone else is experiencing such a failure or perhaps someone
 has
 insightful information if this is actually a bug:  SPARK-13195
 

 Using the new spark mapWithState API, I've encountered a bug when
 setting a
 timeout for mapWithState but no explicit state handling.

 h1. Steps to reproduce:

 1. Create a method which conforms to the StateSpec signature, make sure
 to
 not update any state inside it using *state.update*. Simply create a
 "pass
 through" method, may even be empty.
 2. Create a StateSpec object with method from step 1, which explicitly
 sets
 a timeout using *StateSpec.timeout* method.
 3. Create a DStream pipeline that uses mapWithState with the given
 StateSpec.
 4. Run code using spark-submit. You'll see that the method ends up
 throwing
 the following exception:

 {code}
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in
 stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 136.0 (TID 176, ): java.util.NoSuchElementException: State is not
 set
 at org.apache.spark.streaming.StateImpl.get(State.scala:150)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at

 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 {code}

 h1. Sample code to reproduce the issue:

 {code:Title=MainObject}
 import org.apache.spark.streaming._
 import org.apache.spark.{SparkConf, SparkContext}
 /**
   * Created by yuvali on 04/02/2016.
   */
 object Program {

   def main(args: Array[String]): Unit = {

 val sc = new SparkConf().setAppName("mapWithState bug reproduce")
 val sparkContext = new SparkContext(sc)

 val ssc = new StreamingContext(sparkContext, Seconds(4))
 val stateSpec = StateSpec.function(trackStateFunc
 _).timeout(Seconds(60))

 // Create a stream that generates 1000 lines per second
 val stream = ssc.receiverStream(new DummySource(10))

 // Split the lines into words, and create a paired (key-value)
 dstream
 val wordStream = stream.flatMap {
   _.split(" ")
 }.map(word => (word, 1))

 // This represents the emitted stream from the 

Using jar bundled log4j.xml on worker nodes

2016-02-04 Thread Matthias Niehoff
Hello everybody,

we’ve bundle our log4j.xml into our jar (in the classpath root).

I’ve added the log4j.xml to the spark-defaults.conf with

spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml

There is no log4j.properties or log4j.xml in one of the conf folders on any
machine.

When I start the app the driver is using our log4j.xml, but all the
executors use the default log4j.properties („Using Spark’s default log4j
profile: org/apache/spark/log4j-defaults.properties“).

What do I have to change to make spark use the log4j.xml from our jar also
on our executors?

We are using Spark 1.5.2

Thank you!

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Question on RDD caching

2016-02-04 Thread Vishnu Viswanath
Hello,

When we call cache() or persist(MEMORY_ONLY), how does the request flow to
the nodes?
I am assuming this will happen:

1.  Driver knows which all nodes hold the partition for the given
rdd (where is this info stored?)
2. It sends a cache request to the node's executor
3. The executor will store the Partition in memory
4. Therefore, each node can have partitions of different RDDs in it's cache.

Can someone please tell me if I am correct.

Thanks and Regards,
Vishnu Viswanath,


Re: [External] Re: Spark 1.6.0 HiveContext NPE

2016-02-04 Thread Ted Yu
Jay:
It would be nice if you can patch Spark with below PR and give it a try.

Thanks

On Wed, Feb 3, 2016 at 6:03 PM, Ted Yu  wrote:

> Created a pull request:
> https://github.com/apache/spark/pull/11066
>
> FYI
>
> On Wed, Feb 3, 2016 at 1:27 PM, Shipper, Jay [USA] 
> wrote:
>
>> It was just renamed recently: https://github.com/apache/spark/pull/10981
>>
>> As SessionState is entirely managed by Spark’s code, it still seems like
>> this is a bug with Spark 1.6.0, and not with how our application is using
>> HiveContext.  But I’d feel more confident filing a bug if someone else
>> could confirm they’re having this issue with Spark 1.6.0.  Ideally, we
>> should also have some simple proof of concept that can be posted with the
>> bug.
>>
>> From: Ted Yu 
>> Date: Wednesday, February 3, 2016 at 3:57 PM
>> To: Jay Shipper 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE
>>
>> In ClientWrapper.scala, the SessionState.get().getConf call might have
>> been executed ahead of SessionState.start(state) at line 194.
>>
>> This was the JIRA:
>>
>> [SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
>>
>> In master branch, there is no more ClientWrapper.scala
>>
>> FYI
>>
>> On Wed, Feb 3, 2016 at 11:15 AM, Shipper, Jay [USA] 
>> wrote:
>>
>>> One quick update on this: The NPE is not happening with Spark 1.5.2, so
>>> this problem seems specific to Spark 1.6.0.
>>>
>>> From: Jay Shipper 
>>> Date: Wednesday, February 3, 2016 at 12:06 PM
>>> To: "user@spark.apache.org" 
>>> Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE
>>>
>>> Right, I could already tell that from the stack trace and looking at
>>> Spark’s code.  What I’m trying to determine is why that’s coming back as
>>> null now, just from upgrading Spark to 1.6.0.
>>>
>>> From: Ted Yu 
>>> Date: Wednesday, February 3, 2016 at 12:04 PM
>>> To: Jay Shipper 
>>> Cc: "user@spark.apache.org" 
>>> Subject: [External] Re: Spark 1.6.0 HiveContext NPE
>>>
>>> Looks like the NPE came from this line:
>>>   def conf: HiveConf = SessionState.get().getConf
>>>
>>> Meaning SessionState.get() returned null.
>>>
>>> On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
>>> wrote:
>>>
 I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m
 getting a NullPointerException from HiveContext.  It’s happening while it
 tries to load some tables via JDBC from an external database (not Hive),
 using context.read().jdbc():

 —
 java.lang.NullPointerException
 at
 org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
 at
 org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
 at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
 at
 org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
 at
 org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
 at
 org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
 at
 org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
 at
 org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
 at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
 at
 org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
 at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
 at
 org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
 at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
 at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
 at
 org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
 at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
 at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
 —

 Even though the application is not using Hive, HiveContext is used
 instead 

Spark Cassandra Atomic Inserts

2016-02-04 Thread Flaherty, Frank
Cassandra provides "BEGIN BATCH" and "APPLY BATCH" to perform atomic execution 
of multiple statements as below:

BEGIN BATCH
  INSERT INTO "user_status_updates"
("username", "id", "body")
  VALUES(
'dave',
16e2f240-2afa-11e4-8069-5f98e903bf02,
'dave update 4'
);

  INSERT INTO "home_status_updates" (
"timeline_username",
"status_update_id",
"status_update_username",
"body")
  VALUES (
'alice',
16e2f240-2afa-11e4-8069-5f98e903bf02,
'dave',
'dave update 4'
  );
APPLY BATCH;

Is there a way to do multiple Cassandra inserts atomically using the Cassandra 
Connector from Spark?

Thanks,
Frank

Frank Flaherty | Applications Operations Lead   |  Pegasystems Inc.
One Rogers Street, Cambridge, MA 02142
Office: 617.866.6843 | Email: 
frank.flahe...@pega.com | 
www.pega.com

[cid:image001.jpg@01D0547C.D8E182C0]  PegaCan | 
Evolve Your CRM | www.pega.com/PegaCan


[cid:image002.png@01D0547C.D8E182C0]  
[cid:image003.png@01D0547C.D8E182C0]    
[cid:image004.png@01D0547C.D8E182C0]   
 [cid:image005.png@01D0547C.D8E182C0] 
PegaWORLD 2015 | June 7-9, 
2015 | Orlando, FL | 
www.pegaworld2015.com



Memory tuning in spark sql

2016-02-04 Thread ARUN.BONGALE
Hi Sir/madam,
Greetings of the day.

I am working on Spark 1.6.0 with AWS EMR(Elastic Map Reduce). I'm facing some 
issues in reading large(500 mb) file in spark-sql.
Sometimes i get heap space error and sometimes the executors fail.
i have increased the driver memory, executor memory, kryo serializer buffer 
size.. etc.. but nothing helps.

I Kindly request your help in resolving this issue.

Thanks
Arun.
This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.


Re: spark streaming web ui not showing the events - direct kafka api

2016-02-04 Thread Cody Koeninger
There have been changes to visibility of info in ui between 1.4 and 1.5, I
can't say off the top of my head at which point versions they took place.

On Thu, Feb 4, 2016 at 12:07 AM, vimal dinakaran 
wrote:

> No I am using DSE 4.8 which has spark 1.4. Is this a known issue ?
>
> On Wed, Jan 27, 2016 at 11:52 PM, Cody Koeninger 
> wrote:
>
>> Have you tried spark 1.5?
>>
>> On Wed, Jan 27, 2016 at 11:14 AM, vimal dinakaran 
>> wrote:
>>
>>> Hi ,
>>>  I am using spark 1.4 with direct kafka api . In my streaming ui , I am
>>> able to see the events listed in UI only if add stream.print() statements
>>> or else event rate and input events remains in 0 eventhough the events gets
>>> processed.
>>>
>>> Without print statements , I have the action saveToCassandra in the
>>> dstream.
>>>
>>> Any reasons why is this not working ?
>>>
>>> Thanks
>>> Vimal
>>>
>>
>>
>


Re: Spark job does not perform well when some RDD in memory and some on Disk

2016-02-04 Thread Prabhu Joseph
If spark.locality.wait is 0, then there are two performance issues:

   1. Task Scheduler won't wait to schedule the tasks as DATA_LOCAL, will
launch it immediately on some node even if it is less local. The
probability of tasks running as less local will be higher
and affect the overall Job Performance.
  2. In case of Executor having not enough heap memory, some tasks
which has RDD on cache and some other has on hadoop, and if
spark.locality.wait is 0, all the tasks will starts parallel and since the
Executor Process is both Memory and IO intensive, the GC will be high and
tasks will be slower.














On Thu, Feb 4, 2016 at 5:13 PM, Alonso Isidoro Roman 
wrote:

> "But learned that it is better not to reduce it to 0."
>
> could you explain a bit more this sentence?
>
> thanks
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then programming
> must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>
>
> 2016-02-04 11:33 GMT+01:00 Prabhu Joseph :
>
>> Okay, the reason for the task delay within executor when some RDD in
>> memory and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY,
>> in this case Scheduler waits
>> for *spark.locality.wait *3 seconds default. During this period,
>> scheduler waits to launch a data-local task before giving up and launching
>> it on a less-local node.
>>
>> So after making it 0, all tasks started parallel. But learned that it is
>> better not to reduce it to 0.
>>
>>
>> On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph > > wrote:
>>
>>> Hi All,
>>>
>>>
>>> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
>>> RDD's created each approx 250MB data) and there are two jobs. Job A gets
>>> the line with "a" and the Job B gets the line with "b". The spark
>>> application is ran multiple times, each time with
>>> different executor memory, and enable/disable cache() function. Job A
>>> performance is same in all the runs as it has to read the entire data first
>>> time from Disk.
>>>
>>> Spark Cluster - standalone mode with Spark Master, single worker node
>>> (12 cores, 16GB memory)
>>>
>>> val logData = sc.textFile(logFile, 2)
>>> var numAs = logData.filter(line => line.contains("a")).count()
>>> var numBs = logData.filter(line => line.contains("b")).count()
>>>
>>>
>>> *Job B (which has 5 tasks) results below:*
>>>
>>> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1
>>> image]
>>>
>>> Since logData is not cached, the job B has to again read the 1.2GB
>>> data from hadoop into memory and all the 5 tasks started parallel and each
>>> took 2 sec (29ms for GC) and the
>>>  overall job completed in 2 seconds.
>>>
>>> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached
>>> took 4 seconds [ran2 image, ran2_cache image]
>>>
>>>  val logData = sc.textFile(logFile, 2).cache()
>>>
>>>  The Executor does not have enough memory to cache and hence again
>>> needs to read the entire 1.2GB data from hadoop into memory.  But since the
>>> cache() is used, leads to lot of GC pause leading to slowness in task
>>> completion. Each task started parallel and
>>> completed in 4 seconds (more than 1 sec for GC).
>>>
>>> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took
>>> 10 seconds [ran3 image]*
>>>
>>>  The Executor has memory that can fit 4 RDD partitions into memory
>>> but 5th RDD it has to read from Hadoop. 4 tasks are started parallel and
>>> they completed in 0.3 seconds without GC. But the 5th task which has to
>>> read RDD from disk is started after 4 seconds, and gets completed in 2
>>> seconds. Analysing why the 5th task is not started parallel with other
>>> tasks or at least why it is not started immediately after the other task
>>> completion.
>>>
>>> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
>>> took 0.3 seconds [ran4 image]
>>>
>>>  The executor has enough memory to cache all the 5 RDD. All 5 tasks
>>> are started in parallel and gets completed within 0.3 seconds.
>>>
>>>
>>> So Spark performs well when entire input data is in Memory or None. In
>>> case of some RDD in memory and some from disk, there is a delay in
>>> scheduling the fifth task, is it a expected behavior or a possible Bug.
>>>
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>
>


Re: Using jar bundled log4j.xml on worker nodes

2016-02-04 Thread Ted Yu
Have you taken a look at SPARK-11105 ?

Cheers

On Thu, Feb 4, 2016 at 9:06 AM, Matthias Niehoff <
matthias.nieh...@codecentric.de> wrote:

> Hello everybody,
>
> we’ve bundle our log4j.xml into our jar (in the classpath root).
>
> I’ve added the log4j.xml to the spark-defaults.conf with
>
> spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml
>
> There is no log4j.properties or log4j.xml in one of the conf folders on
> any machine.
>
> When I start the app the driver is using our log4j.xml, but all the
> executors use the default log4j.properties („Using Spark’s default log4j
> profile: org/apache/spark/log4j-defaults.properties“).
>
> What do I have to change to make spark use the log4j.xml from our jar also
> on our executors?
>
> We are using Spark 1.5.2
>
> Thank you!
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Re: Memory tuning in spark sql

2016-02-04 Thread Ted Yu
Please take a look at SPARK-1867.

The discussion was very long.
You may want to look for missing classes.

Also see https://bugs.openjdk.java.net/browse/JDK-7172206

On Thu, Feb 4, 2016 at 10:31 AM,  wrote:

> Hi Ted. Thanks for the response.
>
> i'm just trying to do a select *. the table has 1+ million rows.
>
> I have set below parameters.
>
> export SPARK_EXECUTOR_MEMORY=4G
> export SPARK_DRIVER_MEMORY=2G
> spark.kryoserializer.buffer.max 2000m.
>
>
> I have started the thrift server on port 10001 and trying to access these
> spark tables from qlikview BI tools.
> I have been stuck with this. Kinldly help.
>
> PFB the logs.
>
> 16/02/04 18:11:21 INFO TaskSetManager: Finished task 9.0 in stage 6.0 (TID
> 57) in 4305 ms on ip-xx-xx-xx-xx.ec2.internal (8/10)
> 16/02/04 18:11:26 INFO TaskSetManager: Finished task 7.0 in stage 6.0 (TID
> 55) in 14711 ms on ip-xx-xx-xx-xx.ec2.internal (9/10)
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p
> kill -9 %p"
> #   Executing /bin/sh -c "kill -9 17242
> kill -9 17242"...
> 16/02/04 18:11:39 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() for one-way message.
> java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
> at
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:258)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:310)
> at
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:256)
> at
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:588)
> at
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
> at
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 

Reading large set of files in Spark

2016-02-04 Thread Akhilesh Pathodia
Hi,

I am using Spark to read large set of files from HDFS, applying some
formatting on each line and then saving each line as a record in hive.
Spark is reading directory paths from kafka. Each directory can have large
number of files. I am reading one path from kafka and then processing all
files of the directory in parallel. I have delete the directory after all
files are processed I have following questions:

1. What is the optimized way to read large set of files in Spark? I am not
using sc.textFile(), instead I am reading the file content using FileSystem
and creating Dstream of lines.
2. How to delete the directory/files from HDFS after the task is completed?

Thanks,
Akhilesh


Re: Memory tuning in spark sql

2016-02-04 Thread Ted Yu
Can you provide a bit more detail ?

values of the parameters you have tuned
log snippets from executors
snippet of your code

Thanks

On Thu, Feb 4, 2016 at 9:48 AM,  wrote:

> Hi Sir/madam,
> Greetings of the day.
>
> I am working on Spark 1.6.0 with AWS EMR(Elastic Map Reduce). I'm facing
> some issues in reading large(500 mb) file in spark-sql.
> Sometimes i get heap space error and sometimes the executors fail.
> i have increased the driver memory, executor memory, kryo serializer
> buffer size.. etc.. but nothing helps.
>
> I Kindly request your help in resolving this issue.
>
> Thanks
> Arun.
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender and destroy all copies of the original message. Any unauthorized
> review, use, disclosure, dissemination, forwarding, printing or copying of
> this email, and/or any action taken in reliance on the contents of this
> e-mail is strictly prohibited and may be unlawful. Where permitted by
> applicable law, this e-mail and other e-mail communications sent to and
> from Cognizant e-mail addresses may be monitored.
>


Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Shixiong(Ryan) Zhu
Thanks for reporting it. I will take a look.

On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov  wrote:

> Hi,
> I've been playing with the expiramental PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this along
> in case anyone else is experiencing such a failure or perhaps someone has
> insightful information if this is actually a bug:  SPARK-13195
> 
>
> Using the new spark mapWithState API, I've encountered a bug when setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make sure to
> not update any state inside it using *state.update*. Simply create a "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
> val sparkContext = new SparkContext(sc)
>
> val ssc = new StreamingContext(sparkContext, Seconds(4))
> val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
> // Create a stream that generates 1000 lines per second
> val stream = ssc.receiverStream(new DummySource(10))
>
> // Split the lines into words, and create a paired (key-value) dstream
> val wordStream = stream.flatMap {
>   _.split(" ")
> }.map(word => (word, 1))
>
> // This represents the emitted stream from the trackStateFunc. Since we
> emit every input record with the updated value,
> // this stream will contain the same # of records as the input dstream.
> val wordCountStateStream = wordStream.mapWithState(stateSpec)
> wordCountStateStream.print()
>
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the
> time
> we query it interactively
>
> // Don't forget to set checkpoint directory
> ssc.checkpoint("")
> ssc.start()
> ssc.awaitTermination()
>   }
>
>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
> state: State[Long]): Option[(String, Long)] = {
> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
> val output = (key, sum)
> Some(output)
>   }
> }
> {code}
>
> {code:Title=DummySource}
>
> /**
>   * Created by yuvali on 04/02/2016.
>   */
>
> import org.apache.spark.storage.StorageLevel
> import scala.util.Random
> import 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Sachin Aggarwal
i think we need to add port
http://serverfault.com/questions/317903/aws-ec2-open-port-8080


do u remember doing anything like this earlier for aws 1

On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov  wrote:

> Awesome. Thanks for the super fast reply.
>
>
> On Thu, Feb 4, 2016, 21:16 Tathagata Das 
> wrote:
>
>> Shixiong has already opened the PR -
>> https://github.com/apache/spark/pull/11081
>>
>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
>> wrote:
>>
>>> Let me know if you do need a pull request for this, I can make that
>>> happen (given someone does a vast PR to make sure I'm understanding this
>>> problem right).
>>>
>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 Thanks for reporting it. I will take a look.

 On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
 wrote:

> Hi,
> I've been playing with the expiramental
> PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering
> if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this
> along
> in case anyone else is experiencing such a failure or perhaps someone
> has
> insightful information if this is actually a bug:  SPARK-13195
> 
>
> Using the new spark mapWithState API, I've encountered a bug when
> setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make
> sure to
> not update any state inside it using *state.update*. Simply create a
> "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly
> sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up
> throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not
> set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
> val sparkContext = new SparkContext(sc)
>
> val ssc = new StreamingContext(sparkContext, Seconds(4))
> val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
> // Create a stream 

Re: DataFrame First method is resulting different results in each iteration

2016-02-04 Thread Ali Tajeldin EDU
Hi Satish,
  Take a look at the smvTopNRecs() function in the SMV package.  It does 
exactly what you are looking for.  It might be overkill to bring in all of SMV 
for just one function but you will also get a lot more than just DF helper 
functions (modular views, higher level graphs, dynamic loading of modules 
(coming soon), data/code sync). Ok, end of SMV plug :-)

http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc
 (See SmvTopNRecs function at the end).
https://github.com/TresAmigosSD/SMV : SMV github page

For your specific example,
emp_df.smvGroupBy("DeptNo").smvTopNRecs(1, $"Sal".desc)

Two things to note:
1. Use "emp_df" and not the sorted "ordrd_emp_df" as the sort will be performed 
by smvTopNRecs internally.
2. Must use "smvGroupBy" instead of normal "groupBy" method on DataFrame as the 
result of standard "groupBy" hides the original DF and grouping column :-(

--
Ali 

On Feb 3, 2016, at 9:08 PM, Hemant Bhanawat  wrote:

> Ahh.. missed that. 
> 
> I see that you have used "first" function. 'first' returns the first row it 
> has found. On a single executor it may return the right results. But, on 
> multiple executors, it will return the first row of any of the executor which 
> may not be the first row when the results are combined. 
> 
> I believe, if you change your query like this, you will get the right 
> results: 
> 
> ordrd_emp_df.groupBy("DeptNo").
> agg($"DeptNo", max("Sal").as("HighestSal"))
> 
> But as you can see, you get the highest Sal and not the EmpId with highest 
> Sal. For getting EmpId with highest Sal, you will have to change your query 
> to add filters or add subqueries. See the following thread: 
> 
> http://stackoverflow.com/questions/6841605/get-top-1-row-of-each-group
> 
> Hemant Bhanawat
> SnappyData (http://snappydata.io/)
> 
> 
> On Wed, Feb 3, 2016 at 4:33 PM, satish chandra j  
> wrote:
> Hi Hemant,
> My dataframe "ordrd_emd_df" consist data in order as I have applied oderBy in 
> the first step
> And also tried having "orderBy" method before "groupBy" than also getting 
> different results in each iteration
> 
> Regards,
> Satish Chandra
> 
> 
> On Wed, Feb 3, 2016 at 4:28 PM, Hemant Bhanawat  wrote:
> Missing order by? 
> 
> Hemant Bhanawat
> SnappyData (http://snappydata.io/)
> 
> 
> On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j  
> wrote:
> HI All,
> I have data in a emp_df (DataFrame) as mentioned below:
> 
> EmpId   Sal   DeptNo 
> 001   100   10
> 002   120   20
> 003   130   10
> 004   140   20
> 005   150   10
> 
> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as below:
> 
> DeptNo  Sal   EmpId
> 10 150   005
> 10 130   003
> 10 100   001
> 20 140   004
> 20 120   002
> 
> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg First 
> method as below
> 
> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
> 
> Expected output is DeptNo  TopSal
>   10005
>20   004
> But my output varies for each iteration such as
> 
> First Iteration results as  Dept  TopSal
>   10 003
>20 004
> 
> Secnd Iteration results as Dept  TopSal
>   10 005
>   20 004
> 
> Third Iteration results as  Dept  TopSal
>   10 003
>   20 002
> 
> Not sure why output varies on each iteration as no change in code and values 
> in DataFrame
> 
> Please let me know if any inputs on this 
> 
> Regards,
> Satish Chandra J
> 
> 
> 



Dataset Encoders for SparseVector

2016-02-04 Thread raj.kumar
Hi, 

I have a DataFrame df with a column "feature" of type SparseVector that
results from the ml library's VectorAssembler class. 

I'd like to get a Dataset of SparseVectors from this column, but when I do a 

df.as[SparseVector] scala complains that it doesn't know of an encoder for
SparseVector. If I then try to implement the Encoder[T] interface for
SparseVector I get the error
"java.lang.RuntimeException: Only expression encoders are supported today"

How can I get a Dataset[SparseVector] from the output of VectorAssembler?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-Encoders-for-SparseVector-tp26149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Sachin Aggarwal
http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/

The default Tomcat server uses port 8080. You need to open that port on
your instance to make sure your Tomcat server is available on the Web (you
could also change the default port). In the AWS Management Console, select
Security Groups (left navigation bar), select the quick-start group, the
Inbound tab and add port 8080. Make sure you click “Add Rule” and then
“Apply Rule Changes”.

On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal 
wrote:

> i think we need to add port
> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>
>
> do u remember doing anything like this earlier for aws 1
>
> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov  wrote:
>
>> Awesome. Thanks for the super fast reply.
>>
>>
>> On Thu, Feb 4, 2016, 21:16 Tathagata Das 
>> wrote:
>>
>>> Shixiong has already opened the PR -
>>> https://github.com/apache/spark/pull/11081
>>>
>>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
>>> wrote:
>>>
 Let me know if you do need a pull request for this, I can make that
 happen (given someone does a vast PR to make sure I'm understanding this
 problem right).

 On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> Thanks for reporting it. I will take a look.
>
> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
> wrote:
>
>> Hi,
>> I've been playing with the expiramental
>> PairDStreamFunctions.mapWithState
>> feature and I've seem to have stumbled across a bug, and was
>> wondering if
>> anyone else has been seeing this behavior.
>>
>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>> along
>> in case anyone else is experiencing such a failure or perhaps someone
>> has
>> insightful information if this is actually a bug:  SPARK-13195
>> 
>>
>> Using the new spark mapWithState API, I've encountered a bug when
>> setting a
>> timeout for mapWithState but no explicit state handling.
>>
>> h1. Steps to reproduce:
>>
>> 1. Create a method which conforms to the StateSpec signature, make
>> sure to
>> not update any state inside it using *state.update*. Simply create a
>> "pass
>> through" method, may even be empty.
>> 2. Create a StateSpec object with method from step 1, which
>> explicitly sets
>> a timeout using *StateSpec.timeout* method.
>> 3. Create a DStream pipeline that uses mapWithState with the given
>> StateSpec.
>> 4. Run code using spark-submit. You'll see that the method ends up
>> throwing
>> the following exception:
>>
>> {code}
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in
>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage
>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not
>> set
>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>>
>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>> at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Sachin Aggarwal
I am sorry for spam, I replied in wrong thread sleepy head :-(

On Fri, Feb 5, 2016 at 1:15 AM, Sachin Aggarwal 
wrote:

>
> http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/
>
> The default Tomcat server uses port 8080. You need to open that port on
> your instance to make sure your Tomcat server is available on the Web (you
> could also change the default port). In the AWS Management Console, select
> Security Groups (left navigation bar), select the quick-start group, the
> Inbound tab and add port 8080. Make sure you click “Add Rule” and then
> “Apply Rule Changes”.
>
> On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal <
> different.sac...@gmail.com> wrote:
>
>> i think we need to add port
>> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>>
>>
>> do u remember doing anything like this earlier for aws 1
>>
>> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov 
>> wrote:
>>
>>> Awesome. Thanks for the super fast reply.
>>>
>>>
>>> On Thu, Feb 4, 2016, 21:16 Tathagata Das 
>>> wrote:
>>>
 Shixiong has already opened the PR -
 https://github.com/apache/spark/pull/11081

 On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
 wrote:

> Let me know if you do need a pull request for this, I can make that
> happen (given someone does a vast PR to make sure I'm understanding this
> problem right).
>
> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Thanks for reporting it. I will take a look.
>>
>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
>> wrote:
>>
>>> Hi,
>>> I've been playing with the expiramental
>>> PairDStreamFunctions.mapWithState
>>> feature and I've seem to have stumbled across a bug, and was
>>> wondering if
>>> anyone else has been seeing this behavior.
>>>
>>> I've opened up an issue in the Spark JIRA, I simply want to pass
>>> this along
>>> in case anyone else is experiencing such a failure or perhaps
>>> someone has
>>> insightful information if this is actually a bug:  SPARK-13195
>>> 
>>>
>>> Using the new spark mapWithState API, I've encountered a bug when
>>> setting a
>>> timeout for mapWithState but no explicit state handling.
>>>
>>> h1. Steps to reproduce:
>>>
>>> 1. Create a method which conforms to the StateSpec signature, make
>>> sure to
>>> not update any state inside it using *state.update*. Simply create a
>>> "pass
>>> through" method, may even be empty.
>>> 2. Create a StateSpec object with method from step 1, which
>>> explicitly sets
>>> a timeout using *StateSpec.timeout* method.
>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>> StateSpec.
>>> 4. Run code using spark-submit. You'll see that the method ends up
>>> throwing
>>> the following exception:
>>>
>>> {code}
>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 0 in
>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>>> stage
>>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is
>>> not set
>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>> at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at
>>>
>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at
>>> 

sc.textFile the number of the workers to parallelize

2016-02-04 Thread Lin, Hao
Hi,

I have a question on the number of workers that Spark enable to parallelize the 
loading of files using sc.textFile. When I used sc.textFile to access multiple 
files in AWS S3, it seems to only enable 2 workers regardless of how many 
worker nodes I have in my cluster. So how does Spark configure the 
parallelization in regard of the size of cluster nodes? In the following case, 
spark has 896 tasks split between only two nodes 10.162.97.235 and 
10.162.97.237, while I have 9 nodes in the cluster.

thanks

Example of doing a count:
 scala> s3File.count
16/02/04 18:12:06 INFO SparkContext: Starting job: count at :30
16/02/04 18:12:06 INFO DAGScheduler: Got job 0 (count at :30) with 896 
output partitions
16/02/04 18:12:06 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
:30)
16/02/04 18:12:06 INFO DAGScheduler: Parents of final stage: List()
16/02/04 18:12:06 INFO DAGScheduler: Missing parents: List()
16/02/04 18:12:06 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at textFile at :27), which has no missing parents
16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 3.0 KB, free 228.3 KB)
16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 1834.0 B, free 230.1 KB)
16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.162.98.112:46425 (size: 1834.0 B, free: 517.4 MB)
16/02/04 18:12:07 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1006
16/02/04 18:12:07 INFO DAGScheduler: Submitting 896 missing tasks from 
ResultStage 0 (MapPartitionsRDD[1] at textFile at :27)
16/02/04 18:12:07 INFO YarnScheduler: Adding task set 0.0 with 896 tasks
16/02/04 18:12:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
10.162.97.235, partition 0,RACK_LOCAL, 2213 bytes)
16/02/04 18:12:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
10.162.97.237, partition 1,RACK_LOCAL, 2213 bytes)
16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.162.97.235:38643 (size: 1834.0 B, free: 1259.8 MB)
16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.162.97.237:45360 (size: 1834.0 B, free: 1259.8 MB)
16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
10.162.97.237:45360 (size: 23.8 KB, free: 1259.8 MB)
16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
10.162.97.235:38643 (size: 23.8 KB, free: 1259.8 MB)

Confidentiality Notice::  This email, including attachments, may include 
non-public, proprietary, confidential or legally privileged information.  If 
you are not an intended recipient or an authorized agent of an intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of the information contained in or transmitted with this e-mail is 
unauthorized and strictly prohibited.  If you have received this email in 
error, please notify the sender by replying to this message and permanently 
delete this e-mail, its attachments, and any copies of it immediately.  You 
should not retain, copy or use this e-mail or any attachment for any purpose, 
nor disclose all or any part of the contents to any other person. Thank you.


Re: Dynamic sql in Spark 1.5

2016-02-04 Thread Ali Tajeldin EDU
Sorry, I'm not quite following what your intent is.  Not sure how TagN column 
is being derived.  Is Dataset1 your input and Dataset 2 your output?  I Don't 
see the relationship between them clearly.  Can you describe your input, and 
the expected output.
--
Ali

On Feb 2, 2016, at 11:28 PM, Divya Gehlot  wrote:

> Hi,
> I have data set like :
> Dataset 1
> HeaderCol1 HeadCol2 HeadCol3 
>  dataset 1 dataset2 dataset 3
> dataset 11 dataset13 dataset 13
> dataset 21 dataset22 dataset 23
> 
> Datset 2
> HeadColumn1 HeadColumn2HeadColumn3 HeadColumn4
> Tag1  Dataset1  
> Tag2  Dataset1   Dataset2
> Tag3  Dataset1  Dataset2   Dataset3
> Tag4 DifferentDataset1
> Tag5 DifferentDataset1   DifferentDataset2 
> Tag6 DifferentDataset1DifferentDataset2 DifferentDataset3
> 
> 
> My requirement is to tag dataset(adding one more column) based on dataset 1
> 
> 
> Can I do implement it in spark.
> In RDBMS we have implemented using dynamic sql.
> 
> Would really appreciate the help.
> 
> 
> Thanks,
> Divya 
>  
> 
> 
> 
> 
> On 3 February 2016 at 11:42, Ali Tajeldin EDU  wrote:
> While you can construct the SQL string dynamically in scala/java/python, it 
> would be best to use the Dataframe API for creating dynamic SQL queries.  See 
> http://spark.apache.org/docs/1.5.2/sql-programming-guide.html for details.
> 
> On Feb 2, 2016, at 6:49 PM, Divya Gehlot  wrote:
> 
>> Hi,
>> Does Spark supports dyamic sql ?
>> Would really appreciate the help , if any one could share some 
>> references/examples.
>> 
>> 
>> 
>> Thanks,
>> Divya 
> 
> 



Re: Broadcast join on multiple dataframes

2016-02-04 Thread Srikanth
Hello,

Any pointers on what is causing the optimizer to convert broadcast to
shuffle join?
This join is with a file that is just 4kb in size.

Complete plan -->
https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0
DAG from UI -->
https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_select.PNG?dl=0


== Optimized Logical Plan ==
Project [...]
+- Join LeftOuter, Some((start_ip#48L = start_ip_num#144L))
   :- Project [...]
   :  +- Join Inner, Some((cast(creative_id#9 as bigint) =
creative_id#130L))
   : :- Project [...]
   : :  +- Join Inner, Some((cast(strategy_id#10 as bigint) =
strategy_id#126L))
   : : :- Project [...]
   : : :  +- Join LeftOuter, Some((cast(exchange_id#13 as bigint) =
id#142L))
   : : : :- Project [...]
   : : : :  +- Join LeftOuter, Some((browser_id#59 =
technology_key#169))
   : : : : :- Project [...]
   : : : : :  +- Join LeftOuter,
Some((primary_browser_language#61 = id#166))
   : : : : : :- Project [...]
   : : : : : :  +- Filter ((NOT (campaign_id#12 = 0) &&
(mm_int_cost#36 < 100.0)) && ((cost_sum#41 < 100.0) &&
(total_spend#42 < 100.0)))
   : : : : : : +- Relation[...)
   : : : : : +- Project [id#166,two_letter_code#167]
   : : : : :+- BroadcastHint
   : : : : :   +- Relation[...
   : : : : +- BroadcastHint
   : : : :+- Relation[...
   : : : +- Project [description#141,id#142L]
   : : :+- BroadcastHint
   : : :   +- Relation[description#141,id#142L,name#143]
JSONRelation

== Physical Plan ==
Project [...]
+- SortMergeOuterJoin [start_ip#48L], [start_ip_num#144L], LeftOuter, None
   :- Sort [start_ip#48L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(start_ip#48L,480), None
   : +- Project [...]
   :+- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#130L], BuildRight
   :   :- Project [...]
   :   :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#126L], BuildRight
   :   : :- Project [...]
   :   : :  +- BroadcastHashOuterJoin [cast(exchange_id#13 as
bigint)], [id#142L], LeftOuter, None
   :   : : :- Project [...]
   :   : : :  +- BroadcastHashOuterJoin [browser_id#59],
[technology_key#169], LeftOuter, None
   :   : : : :- Project [...]
   :   : : : :  +- SortMergeOuterJoin
[primary_browser_language#61], [id#166], LeftOuter, None
   :   : : : : :- Sort [primary_browser_language#61
ASC], false, 0
   :   : : : : :  +- TungstenExchange
hashpartitioning(primary_browser_language#61,480), None
   :   : : : : : +- Project [...]
   :   : : : : :+- Filter (((NOT
(campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && (cost_sum#41 <
100.0)) && (total_spend#42 < 100.0))
   :   : : : : :   +- Scan
CsvRelation(,Some(s3://
   :   : : : : +- Sort [id#166 ASC], false, 0
   :   : : : :+- TungstenExchange
hashpartitioning(id#166,480), None
   :   : : : :   +- Project
[id#166,two_letter_code#167]
   :   : : : :  +- Scan
CsvRelation(,Some(s3
   :   : : : +- ConvertToUnsafe
   :   : : :+- Scan
CsvRelation(,Some(s3://
   :   : : +- Project [description#141,id#142L]
   :   : :+- Scan
JSONRelation[description#141,id#142L,name#143] InputPaths: s3://
   :   : +- Project


Re: Re: About cache table performance in spark sql

2016-02-04 Thread Takeshi Yamamuro
Hi,

Parquet data are column-wise and highly compressed, so the size of
deserialized rows in spark
could be bigger than that of parquet data on disk.
That is, I think that  24.59GB of parquet data becomes (18.1GB + 23.6GB)
data in spark.

Yes, you know cached data in spark also are compressed by default though,
spark uses simpler compression algorithms than parquet does and
ISTM the compression ratios are typically worse than those of parquet.


On Thu, Feb 4, 2016 at 3:16 PM, fightf...@163.com  wrote:

> Hi,
> Thanks a lot for your explaination. I know that the slow process mainly
> caused by GC pressure and I had understand this difference
> just from your advice.
>
> I had each executor memory with 6GB and try to cache table.
> I had 3 executors and finally I can see some info from the spark job ui
> storage, like the following:
>
>
> RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory
> Size in ExternalBlockStore Size on Disk
> In-memory table video1203 Memory Deserialized 1x Replicated 251 100%
> 18.1 GB 0.0 B 23.6 GB
>
> I can see that spark sql try to cache data into memory. And when I ran the
> following queries over this table video1203, I can get
> fast response. Another thing that confused me is that the above data size
> (in memory and on Disk). I can see that the in memory
> data size is 18.1GB, which almost equals sum of my executor memory. But
> why the Disk size if 23.6GB? From impala I get the overall
> parquet file size if about 24.59GB. Would be good to had some correction
> on this.
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Prabhu Joseph 
> *Date:* 2016-02-04 14:35
> *To:* fightf...@163.com
> *CC:* user 
> *Subject:* Re: About cache table performance in spark sql
> Sun,
>
>When Executor don't have enough memory and if it tries to cache the
> data, it spends lot of time on GC and hence the job will be slow. Either,
>
>  1. We should allocate enough memory to cache all RDD and hence the
> job will complete fast
> Or 2. Don't use cache when there is not enough Executor memory.
>
>   To check the GC time, use  --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will
> have sysout with GC.
> The sysout will show many "Full GC" happening when cache is used and
> executor does not have enough heap.
>
>
> Thanks,
> Prabhu Joseph
>
> On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com 
> wrote:
>
>> Hi,
>>
>> I want to make sure that the cache table indeed would accelerate sql
>> queries. Here is one of my use case :
>>   impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
>> I use sqlContext.sql to run queries over this table and try to do cache
>> and uncache command to see if there
>> is any performance disparity. I ran the following query :
>> select * from video1203 where id > 10 and id < 20 and added_year != 1989
>> I can see the following results :
>>
>> 1  If I did not run cache table and just ran sqlContext.sql(), I can see
>> the above query run about 25 seconds.
>> 2  If I firstly run sqlContext.cacheTable("video1203"), the query runs
>> super slow and would cause driver OOM exception, but I can
>> get final results with about running 9 minuts.
>>
>> Would any expert can explain this for me ? I can see that cacheTable
>> cause OOM just because the in-memory columnar storage
>> cannot hold the 24.59GB+ table size into memory. But why the performance
>> is so different and even so bad ?
>>
>> Best,
>> Sun.
>>
>> --
>> fightf...@163.com
>>
>
>


-- 
---
Takeshi Yamamuro


Re: sparkR not able to create /append new columns

2016-02-04 Thread Devesh Raj Singh
Thank you Rui Sun ! It is working now!!

On Thu, Feb 4, 2016 at 9:21 AM, Sun, Rui  wrote:

> Devesh,
>
>
>
> Note that DataFrame is immutable. withColumn returns a new DataFrame
> instead of adding a column in-pace to the DataFrame being operated.
>
>
>
> So, you can modify the for loop like:
>
>
>
> for (j in 1:lev)
>
>
>
> {
>
>
>
>dummy.df.new<-withColumn(df,
>
>paste0(colnames(cat.column),j),
>
>ifelse(df$Species==levels(as.factor(unlist(cat.column)))[j],1,0) )
>
>
>
>df<-dummy.df.new
>
> }
>
>
>
> As you can see, withColumn supports adding only one column, it may be more
> convenient if withColumn supports adding multiple columns at once. There is
> a JIRA requesting such feature (
> https://issues.apache.org/jira/browse/SPARK-12225) which is still under
> discussion. If you desire this feature, you could comment on it.
>
>
>
> *From:* Franc Carter [mailto:franc.car...@gmail.com]
> *Sent:* Wednesday, February 3, 2016 7:40 PM
> *To:* Devesh Raj Singh
> *Cc:* user@spark.apache.org
> *Subject:* Re: sparkR not able to create /append new columns
>
>
>
>
>
> Yes, I didn't work out how to solve that - sorry
>
>
>
>
>
> On 3 February 2016 at 22:37, Devesh Raj Singh 
> wrote:
>
> Hi,
>
>
>
> but "withColumn" will only add once, if i want to add columns to the same
> dataframe in a loop it will keep overwriting the added column and in the
> end the last added column( in the loop) will be the added column. like in
> my code above.
>
>
>
> On Wed, Feb 3, 2016 at 5:05 PM, Franc Carter 
> wrote:
>
>
>
> I had problems doing this as well - I ended up using 'withColumn', it's
> not particularly graceful but it worked (1.5.2 on AWS EMR)
>
>
>
> cheerd
>
>
>
> On 3 February 2016 at 22:06, Devesh Raj Singh 
> wrote:
>
> Hi,
>
>
>
> i am trying to create dummy variables in sparkR by creating new columns
> for categorical variables. But it is not appending the columns
>
>
>
>
>
> df <- createDataFrame(sqlContext, iris)
>
> class(dtypes(df))
>
>
>
> cat.column<-vector(mode="character",length=nrow(df))
>
> cat.column<-collect(select(df,df$Species))
>
> lev<-length(levels(as.factor(unlist(cat.column
>
> varb.names<-vector(mode="character",length=lev)
>
> for (i in 1:lev){
>
>
>
>   varb.names[i]<-paste0(colnames(cat.column),i)
>
>
>
> }
>
>
>
> for (j in 1:lev)
>
>
>
> {
>
>
>
>dummy.df.new<-withColumn(df,paste0(colnames
>
>(cat.column),j),if else(df$Species==levels(as.factor(un
> list(cat.column))
>
>[j],1,0) )
>
>
>
> }
>
>
>
> I am getting the below output for
>
>
>
> head(dummy.df.new)
>
>
>
> output:
>
>
>
>   Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
>
> 1  5.1 3.5  1.4 0.2  setosa1
>
> 2  4.9 3.0  1.4 0.2  setosa1
>
> 3  4.7 3.2  1.3 0.2  setosa1
>
> 4  4.6 3.1  1.5 0.2  setosa1
>
> 5  5.0 3.6  1.4 0.2  setosa1
>
> 6  5.4 3.9  1.7 0.4  setosa1
>
>
>
> Problem: Species2 and Species3 column are not getting added to the
> dataframe
>
>
>
> --
>
> Warm regards,
>
> Devesh.
>
>
>
>
>
> --
>
> Franc
>
>
>
>
>
> --
>
> Warm regards,
>
> Devesh.
>
>
>
>
>
> --
>
> Franc
>



-- 
Warm regards,
Devesh.


Re: Re: About cache table performance in spark sql

2016-02-04 Thread fightf...@163.com
Oh, thanks. Make sense to me. 

Best,
Sun.



fightf...@163.com
 
From: Takeshi Yamamuro
Date: 2016-02-04 16:01
To: fightf...@163.com
CC: user
Subject: Re: Re: About cache table performance in spark sql
Hi,

Parquet data are column-wise and highly compressed, so the size of deserialized 
rows in spark
could be bigger than that of parquet data on disk.
That is, I think that  24.59GB of parquet data becomes (18.1GB + 23.6GB) data 
in spark.

Yes, you know cached data in spark also are compressed by default though, 
spark uses simpler compression algorithms than parquet does and
ISTM the compression ratios are typically worse than those of parquet.


On Thu, Feb 4, 2016 at 3:16 PM, fightf...@163.com  wrote:
Hi, 
Thanks a lot for your explaination. I know that the slow process mainly caused 
by GC pressure and I had understand this difference 
just from your advice. 

I had each executor memory with 6GB and try to cache table. 
I had 3 executors and finally I can see some info from the spark job ui 
storage, like the following: 
 

RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in 
ExternalBlockStore Size on Disk
In-memory table video1203 Memory Deserialized 1x Replicated 251 100% 18.1 GB 
0.0 B 23.6 GB

I can see that spark sql try to cache data into memory. And when I ran the 
following queries over this table video1203, I can get
fast response. Another thing that confused me is that the above data size (in 
memory and on Disk). I can see that the in memory
data size is 18.1GB, which almost equals sum of my executor memory. But why the 
Disk size if 23.6GB? From impala I get the overall
parquet file size if about 24.59GB. Would be good to had some correction on 
this. 

Best,
Sun.



fightf...@163.com
 
From: Prabhu Joseph
Date: 2016-02-04 14:35
To: fightf...@163.com
CC: user
Subject: Re: About cache table performance in spark sql
Sun,

   When Executor don't have enough memory and if it tries to cache the data, it 
spends lot of time on GC and hence the job will be slow. Either,

 1. We should allocate enough memory to cache all RDD and hence the job 
will complete fast
Or 2. Don't use cache when there is not enough Executor memory.

  To check the GC time, use  --conf 
"spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" 
while submitting the job and SPARK_WORKER_DIR will have sysout with GC.
The sysout will show many "Full GC" happening when cache is used and executor 
does not have enough heap.


Thanks,
Prabhu Joseph

On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com  wrote:
Hi, 

I want to make sure that the cache table indeed would accelerate sql queries. 
Here is one of my use case : 
  impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
I use sqlContext.sql to run queries over this table and try to do cache and 
uncache command to see if there
is any performance disparity. I ran the following query : select * from 
video1203 where id > 10 and id < 20 and added_year != 1989
I can see the following results : 

1  If I did not run cache table and just ran sqlContext.sql(), I can see the 
above query run about 25 seconds.
2  If I firstly run sqlContext.cacheTable("video1203"), the query runs super 
slow and would cause driver OOM exception, but I can 
get final results with about running 9 minuts. 

Would any expert can explain this for me ? I can see that cacheTable cause OOM 
just because the in-memory columnar storage 
cannot hold the 24.59GB+ table size into memory. But why the performance is so 
different and even so bad ? 

Best,
Sun.



fightf...@163.com




-- 
---
Takeshi Yamamuro