Re: Executor metrics in spark application

2014-07-22 Thread Denes
I'm also pretty interested how to create custom Sinks in Spark. I'm using it
with Ganglia and the normal metrics from JVM source do show up. I tried to
create my own metric based on Issac's code, but does not show up in Ganglia.
Does anyone know where is the problem?
Here's the code snippet: 

class AccumulatorSource(accumulator: Accumulator[Long], name: String)
extends Source {
  
  val sourceName = accumulator.metrics
  val metricRegistry = new MetricRegistry()
  
  metricRegistry.register(MetricRegistry.name(accumulator, name), new
Gauge[Long] {
 override def getValue: Long = {
return accumulator.value;
  }});

}

and then in the main:
val longAccumulator = sc.accumulator[Long](0);
val accumulatorMetrics = new AccumulatorSource(longAccumulator ,
counters.accumulator);
SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics);




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Executor metrics in spark application

2014-07-22 Thread Denes
I meant custom Sources, sorry. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Executor metrics in spark application

2014-07-22 Thread Shao, Saisai
Hi Denes,

I think you can register your customized metrics source into metrics system 
through metrics.properties, you can take metrics.propertes.template as 
reference,

Basically you can do as follow if you want to monitor on executor:

executor.source.accumulator.class=xx.xx.xx.your-customized-metrics-source

I think the below code can only register metrics source in client side.

SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics);

BTW, it's not a good choice to register through MetricsSystem, it would be nice 
to register through configuration. Also you can enable console sink to verify 
whether the source is registered or not.

Thanks
Jerry


-Original Message-
From: Denes [mailto:te...@outlook.com] 
Sent: Tuesday, July 22, 2014 2:02 PM
To: u...@spark.incubator.apache.org
Subject: Re: Executor metrics in spark application

I'm also pretty interested how to create custom Sinks in Spark. I'm using it 
with Ganglia and the normal metrics from JVM source do show up. I tried to 
create my own metric based on Issac's code, but does not show up in Ganglia.
Does anyone know where is the problem?
Here's the code snippet: 

class AccumulatorSource(accumulator: Accumulator[Long], name: String) extends 
Source {
  
  val sourceName = accumulator.metrics
  val metricRegistry = new MetricRegistry()
  
  metricRegistry.register(MetricRegistry.name(accumulator, name), new 
Gauge[Long] {
 override def getValue: Long = {
return accumulator.value;
  }});

}

and then in the main:
val longAccumulator = sc.accumulator[Long](0); val accumulatorMetrics = new 
AccumulatorSource(longAccumulator , counters.accumulator); 
SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics);




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


number of Cached Partitions v.s. Total Partitions

2014-07-22 Thread Haopu Wang
Hi, I'm using local mode and read a text file as RDD using
JavaSparkContext.textFile() API.

And then call cache() method on the result RDD.

 

I look at the Storage information and find the RDD has 3 partitions but
2 of them have been cached.

Is this a normal behavior? I assume all of partitions should be cached
or none of them.

If I'm wrong, what are the cases when number of cached partitions is
less than the total number of partitions?

 

 



RE: number of Cached Partitions v.s. Total Partitions

2014-07-22 Thread Shao, Saisai
Yes, it's normal when memory is not enough to put the third partition, as you 
can see in your attached picture.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, July 22, 2014 3:09 PM
To: user@spark.apache.org
Subject: number of Cached Partitions v.s. Total Partitions


Hi, I'm using local mode and read a text file as RDD using 
JavaSparkContext.textFile() API.

And then call cache() method on the result RDD.



I look at the Storage information and find the RDD has 3 partitions but 2 of 
them have been cached.

Is this a normal behavior? I assume all of partitions should be cached or none 
of them.

If I'm wrong, what are the cases when number of cached partitions is less 
than the total number of partitions?



[cid:image001.jpg@01CFA5C3.0AE4B440]


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-22 Thread Victor Sheng
Hi, Yin Huai
I test again with your snippet code.
It works well in spark-1.0.1

Here is my code:
 
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 case class Record(data_date: String, mobile: String, create_time: String)
 val mobile = Record(2014-07-20,1234567,2014-07-19)
 val lm = List(mobile)
 val mobileRDD = sc.makeRDD(lm)
 val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD)
 mobileSchemaRDD.registerAsTable(mobile)
 sqlContext.sql(select count(1) from mobile).collect()
 
The Result is like below:
14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at
SparkPlan.scala:52, took 0.296864832 s
res9: Array[org.apache.spark.sql.Row] = Array([1])

   
   But what is the main cause of this exception? And how you find it out by
looking some unknown characters like $line11.$read$
$line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ? 

Thanks,
Victor




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Job aborted due to stage failure: TID x failed for unknown reasons

2014-07-22 Thread Alessandro Lulli
Hi All,

Can someone help on this?

I'm encountering exactly the same issue in a very similar scenario with the
same spark version.

Thanks
Alessandro


On Fri, Jul 18, 2014 at 8:30 PM, Shannon Quinn squ...@gatech.edu wrote:

  Hi all,

 I'm dealing with some strange error messages that I *think* comes down to
 a memory issue, but I'm having a hard time pinning it down and could use
 some guidance from the experts.

 I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one
 has 16GB memory, the other 32GB (which is the master). My application
 involves computing pairwise pixel affinities in images, though the images
 I've tested so far only get as big as 1920x1200, and as small as 16x16.

 I did have to change a few memory and parallelism settings, otherwise I
 was getting explicit OutOfMemoryExceptions. In spark-default.conf:

 spark.executor.memory14g
 spark.default.parallelism32
 spark.akka.frameSize1000

 In spark-env.sh:

 SPARK_DRIVER_MEMORY=10G

 With those settings, however, I get a bunch of WARN statements about Lost
 TIDs (no task is successfully completed) in addition to lost Executors,
 which are repeated 4 times until I finally get the following error message
 and crash:

 ---

 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at
 /home/user/Programming/PySpark-Affinities/affinity.py:243
 Traceback (most recent call last):
   File /home/user/Programming/PySpark-Affinities/affinity.py, line 243,
 in module
 lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
   File
 /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py,
 line 583, in collect
 bytesInJava = self._jrdd.collect().iterator()
   File
 /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__
   File
 /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0:13 failed 4 times, most recent failure: *TID 32 on host
 master.host.univ.edu http://master.host.univ.edu failed for unknown
 reason*
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4)
 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor
 4 from BlockManagerMaster.
 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in
 removeExecutor
 user@master:~/Programming/PySpark-Affinities$

 ---

 If I run the really small image instead (16x16), it *appears* to run to
 completion (gives me the output I expect without any exceptions being
 thrown). However, in the stderr logs for the app that was run, it lists the
 state as KILLED with the final message a ERROR
 CoarseGrainedExecutorBackend: Driver Disassociated. If I run any larger
 images, I get the exception I pasted above.

 Furthermore, if I just do a spark-submit with master=local[*], aside from
 still needing to set the aforementioned memory options, it will work for an
 image of *any* 

Re: Why spark-submit command hangs?

2014-07-22 Thread Earthson
I've just have the same problem.

I'm using

pre
$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client $JOBJAR
--class $JOBCLASS
/pre

It's really strange, because the log shows that 

pre
14/07/22 16:16:58 INFO ui.SparkUI: Started SparkUI at
http://k1227.mzhen.cn:4040
14/07/22 16:16:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/07/22 16:16:58 INFO spark.SparkContext: Added JAR
/home/workspace/ci-demo/target/scala-2.10/SemiData-CIDemo-Library-assembly-0.1.jar
at http://192.168.7.37:53050/jars/SemiData-CIDemo-Library-assembly-0.1.jar
with timestamp 1406017018666
14/07/22 16:16:58 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
14/07/22 16:16:58 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook
for context org.apache.spark.SparkContext@41ecfc8c
/pre

Why cluster.YarnClusterScheduler start? where's the Client?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10392.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: gain access to persisted rdd

2014-07-22 Thread mrm
Ok, thanks for the answers. Unfortunately, there is no sc.getPersistentRDDs
for pyspark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313p10393.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why spark-submit command hangs?

2014-07-22 Thread Earthson
That's what my problem is:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: saveAsSequenceFile for DStream

2014-07-22 Thread Sean Owen
What about simply:

dstream.foreachRDD(_.saveAsSequenceFile(...))

?

On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote:
 First of all, I do not know Scala, but learning.

 I'm doing a proof of concept by streaming content from a socket, counting
 the words and write it to a Tachyon disk. A different script will read the
 file stream and print out the results.

  val lines = ssc.socketTextStream(args(0), args(1).toInt,
 StorageLevel.MEMORY_AND_DISK_SER)
  val words = lines.flatMap(_.split( ))
  val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)
  wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts)
  ssc.start()
  ssc.awaitTermination()

 I already did a proof of concept to write and read sequence files but there
 doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the
 best way to write out an RDD to a stream so that the timestamps are in the
 filenames and so there is minimal overhead in reading the data back in as
 objects, see below.

 My simple successful proof was the following:
 val rdd =  sc.parallelize(Array((a,2), (b,3), (c,1)))
 rdd.saveAsSequenceFile(tachyon://.../123.sf2)
 val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2)

 How can I do something similar with streaming?




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Executor metrics in spark application

2014-07-22 Thread Denes
Hi Jerry,

I know that way of registering a metrics, but it seems defeat the whole
purpose. I'd like to define a source that is set within the application, for
example number of parsed messages. 
If I register it in the metrics.properties, how can I obtain the instance?
(or instances?)
How can I set the property? Is there a way to read an accumulator values
from a Source?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark over graphviz (SPARK-1015, SPARK-975)

2014-07-22 Thread jay vyas
Hi spark.

I see there has been some work around graphviz visualization for spark jobs.

1) I'm wondering if anyone actively maintaining this stuff, and if so what
the best docs are for it - or else, if there is interest in an upstream
JIRA for updating the graphviz APIs it.

2) Also, am curious about utilities for visualizing/optimizing the flow of
data through an RDD at runtime and where those are in the existing codebase.

Any thoughts around pipeline visualization for spark would be appreciated.
I see some conversations about it in JIRAs but not sure what the future is
for this , possibly I could lend a hand if there are any loose ends needing
to be tied.

-- 
jay vyas


RE: Executor metrics in spark application

2014-07-22 Thread Shao, Saisai
Yeah, I start to know your purpose. Original design purpose of customized 
metrics source is focused on self-contained source, seems you need to rely on 
outer variable, so the way you mentioned may be is the only way to register.

Besides, as you cannot see the source in Ganglia, I think you can enable 
console sink to verify the outputs, also seems you want to register this source 
in driver, so you need to enable Ganglia sink on driver side and make sure 
Ganglia client can connect your driver.

Thanks
Jerry

-Original Message-
From: Denes [mailto:te...@outlook.com] 
Sent: Tuesday, July 22, 2014 6:38 PM
To: u...@spark.incubator.apache.org
Subject: RE: Executor metrics in spark application

Hi Jerry,

I know that way of registering a metrics, but it seems defeat the whole 
purpose. I'd like to define a source that is set within the application, for 
example number of parsed messages. 
If I register it in the metrics.properties, how can I obtain the instance?
(or instances?)
How can I set the property? Is there a way to read an accumulator values from a 
Source?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


collect() on small group of Avro files causes plain NullPointerException

2014-07-22 Thread Sparky
Running a simple collect method on a group of Avro objects causes a plain
NullPointerException.  Does anyone know what may be wrong?

files.collect()

Press ENTER or type command to continue
Exception in thread Executor task launch worker-0
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
at scala.Option.flatMap(Option.scala:170)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: collect() on small list causes NullPointerException

2014-07-22 Thread Sparky
For those curious I was using KryoRegistrator it was causing some null
pointer exception.  I removed the code and problem went away.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-list-causes-NullPointerException-tp10400p10402.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: collect() on small group of Avro files causes plain NullPointerException

2014-07-22 Thread Eugen Cepoi
Do you have a list/array in your avro record? If yes this could cause the
problem. I experienced this kind of problem and solved it by providing
custom kryo ser/de for avro lists. Also be carefull spark reuses records,
so if you just read and then don't copy/transform them you would end up
with the records having same values.


2014-07-22 15:01 GMT+02:00 Sparky gullo_tho...@bah.com:

 Running a simple collect method on a group of Avro objects causes a plain
 NullPointerException.  Does anyone know what may be wrong?

 files.collect()

 Press ENTER or type command to continue
 Exception in thread Executor task launch worker-0
 java.lang.NullPointerException
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
 at scala.Option.flatMap(Option.scala:170)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



hadoop version

2014-07-22 Thread mrm
Hi,

Where can I find the version of Hadoop my cluster is using? I launched my
ec2 cluster using the spark-ec2 script with the --hadoop-major-version=2
option. However, the folder hadoop-native/lib in the master node only
contains files that end in 1.0.0. Does that mean that I have Hadoop version
1?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


the implications of some items in webUI

2014-07-22 Thread Yifan LI
Hi,

I am analysing the application processing on Spark(GraphX), but feeling a 
little confused on some items of webUI.

1) what is the difference between Duration(Stages - Completed Stages) and 
Task Time(Executors) ?
for instance, 43s VS. 5.6 m
Task Time is approximated to Duration multiplied with Total Tasks?

2) what are the exact meanings of Shuffle Read/Shuffle Write?


Best,
Yifan LI




Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

A minimal example:

case class P(name:String)
val ps = Array(P(alice), P(bob), P(charly), P(bob))
sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
[Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
(P(bob),1), (P(abe),1), (P(charly),1))

In contrast to the expected behavior, that should be equivalent to:
sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

Any ideas why this doesn't work?

-kr, Gerard.


Re: Is there anyone who use streaming join to filter spam as guide mentioned?

2014-07-22 Thread hawkwang

Hi TD,

Eventually I found that I made a mistake - the RDD I used for join does 
not contain any content.

Now it works.

Thanks,
Hawk

On 2014年07月21日 17:58, Tathagata Das wrote:

Could you share your code snippet so that we can take a look?

TD



On Mon, Jul 21, 2014 at 7:23 AM, hawkwang wanghao.b...@gmail.com 
mailto:wanghao.b...@gmail.com wrote:


Hello guys,

I'm just trying to use spark streaming features.
I noticed that there is join example for filtering spam, so I just
want to try.
But, nothing happens after join, the output JavaPairDStream
content is same as before.
So, is there any examples that I can refer to?

Thanks for any suggestions.

Regards,
Hawk






Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
and derivates like 'distinct'.

groupByKey() seems to work

sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect
res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
(abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

 A minimal example:

 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
 (P(bob),1), (P(abe),1), (P(charly),1))

 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

 Any ideas why this doesn't work?

 -kr, Gerard.



Spark Streaming - How to save all items in batchs from beginning to a single stream rdd?

2014-07-22 Thread hawkwang

Hi guys,

Is it possible to generate a single stream rdd which can be updated with 
new batch rdd content?


I know that we can use updateStateByKey to make aggregation,
but here just want to keep tracking all historical original content.

I also noticed that we can save to redis or other storage system,
but can we just use spark streaming mechanism to make it happen?

Thanks for any suggestion.

Regards,
Hawk



Re: Problem running Spark shell (1.0.0) on EMR

2014-07-22 Thread Martin Goodson
I am also having exactly the same problem, calling using pyspark. Has
anyone managed to get this script to work?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Wed, Jul 16, 2014 at 2:10 PM, Ian Wilkinson ia...@me.com wrote:

 Hi,

 I’m trying to run the Spark (1.0.0) shell on EMR and encountering a
 classpath issue.
 I suspect I’m missing something gloriously obviously, but so far it is
 eluding me.

 I launch the EMR Cluster (using the aws cli) with:

 aws emr create-cluster --name Test Cluster  \
 --ami-version 3.0.3 \
 --no-auto-terminate \
 --ec2-attributes KeyName=... \
 --bootstrap-actions
 Path=s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb \
 --instance-groups
 InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium  \
 InstanceGroupType=CORE,InstanceCount=1,InstanceType=m1.medium
 --region eu-west-1

 then,

 $ aws emr ssh --cluster-id ... --key-pair-file ... --region eu-west-1

 On the master node, I then launch the shell with:

 [hadoop@ip-... spark]$ ./bin/spark-shell

 and try performing:

 scala val logs = sc.textFile(s3n://.../“)

 this produces:

 14/07/16 12:40:35 WARN storage.BlockManager: Putting block broadcast_0
 failed
 java.lang.NoSuchMethodError:
 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;


 Any help mighty welcome,
 ian




Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Daniel Siegmann
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your example is actually grouping on just the name. Try this:

sc.parallelize(ps).map(x= (x,1)).groupByKey().collect
res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
(P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
(P(charly),ArrayBuffer(1)))


On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
 and derivates like 'distinct'.

 groupByKey() seems to work

 sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect
 res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
 (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



 On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

 A minimal example:

 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
 (P(bob),1), (P(abe),1), (P(charly),1))

 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

 Any ideas why this doesn't work?

 -kr, Gerard.





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Tranforming flume events using Spark transformation functions

2014-07-22 Thread Sundaram, Muthu X.
Hi All,
  I am getting events from flume using following line.

  JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these 
function directly on this?

I need to do following kind of operations

 AA
YDelta
TAA
 Southwest
 AA

Unique tickets are  , Y, , , .
Count is  2,  1, T 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDDString.

Can I create new JavaRDDString? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord);
}

Where do I create new JavaRDDString? DO I do it before this loop? How do I 
create this JavaRDDString?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu




Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
Yes, right. 'sc.parallelize(ps).map(x= (**x.name**,1)).groupByKey().collect
'
An oversight from my side.

Thanks!,  Gerard.


On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann daniel.siegm...@velos.io
wrote:

 I can confirm this bug. The behavior for groupByKey is the same as
 reduceByKey - your example is actually grouping on just the name. Try
 this:

 sc.parallelize(ps).map(x= (x,1)).groupByKey().collect
 res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
 (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
 (P(charly),ArrayBuffer(1)))


 On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Just to narrow down the issue, it looks like the issue is in
 'reduceByKey' and derivates like 'distinct'.

 groupByKey() seems to work

 sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect
 res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
 (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



 On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

 A minimal example:

 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
 (P(bob),1), (P(abe),1), (P(charly),1))

 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

 Any ideas why this doesn't work?

 -kr, Gerard.





 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io



Re: saveAsSequenceFile for DStream

2014-07-22 Thread Barnaby Falls
Thanks Sean! I got that working last night similar to how you solved it. Any 
ideas about how to monitor that same folder in another script by creating a 
stream? I can use sc.sequenceFile() to read in the RDD, but how do I get the 
name of the file that got added since there is no sequenceFileStream() method? 
Thanks again for your help.

 On Jul 22, 2014, at 1:57, Sean Owen so...@cloudera.com wrote:
 
 What about simply:
 
 dstream.foreachRDD(_.saveAsSequenceFile(...))
 
 ?
 
 On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote:
 First of all, I do not know Scala, but learning.
 
 I'm doing a proof of concept by streaming content from a socket, counting
 the words and write it to a Tachyon disk. A different script will read the
 file stream and print out the results.
 
 val lines = ssc.socketTextStream(args(0), args(1).toInt,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.flatMap(_.split( ))
 val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)
 wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts)
 ssc.start()
 ssc.awaitTermination()
 
 I already did a proof of concept to write and read sequence files but there
 doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the
 best way to write out an RDD to a stream so that the timestamps are in the
 filenames and so there is minimal overhead in reading the data back in as
 objects, see below.
 
 My simple successful proof was the following:
 val rdd =  sc.parallelize(Array((a,2), (b,3), (c,1)))
 rdd.saveAsSequenceFile(tachyon://.../123.sf2)
 val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2)
 
 How can I do something similar with streaming?
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
I created https://issues.apache.org/jira/browse/SPARK-2620 to track this.

Maybe useful to know, this is a regression on Spark 1.0.0. I tested the
same sample code on 0.9.1 and it worked (we have several jobs using case
classes as key aggregators, so it better does)

-kr, Gerard.


On Tue, Jul 22, 2014 at 5:37 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Yes, right. 'sc.parallelize(ps).map(x= (**x.name**,1)).groupByKey().
 collect'
 An oversight from my side.

 Thanks!,  Gerard.


 On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann daniel.siegm...@velos.io
  wrote:

 I can confirm this bug. The behavior for groupByKey is the same as
 reduceByKey - your example is actually grouping on just the name. Try
 this:

 sc.parallelize(ps).map(x= (x,1)).groupByKey().collect
 res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
 (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
 (P(charly),ArrayBuffer(1)))


 On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Just to narrow down the issue, it looks like the issue is in
 'reduceByKey' and derivates like 'distinct'.

 groupByKey() seems to work

 sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect
 res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
 (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



 On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

 A minimal example:

 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
 (P(bob),1), (P(abe),1), (P(charly),1))

 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) =
 x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

 Any ideas why this doesn't work?

 -kr, Gerard.





 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





Re: Spark Streaming source from Amazon Kinesis

2014-07-22 Thread Chris Fregly
i took this over from parviz.

i recently submitted a new PR for Kinesis Spark Streaming support:
https://github.com/apache/spark/pull/1434

others have tested it with good success, so give it a whirl!

waiting for it to be reviewed/merged.  please put any feedback into the PR
directly.

thanks!

-chris


On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 No worries, looking forward to it!

 Matei

 On Apr 21, 2014, at 1:59 PM, Parviz Deyhim pdey...@gmail.com wrote:

 sorry Matei. Will definitely start working on making the changes soon :)


 On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 There was a patch posted a few weeks ago (
 https://github.com/apache/spark/pull/223), but it needs a few changes in
 packaging because it uses a license that isn’t fully compatible with
 Apache. I’d like to get this merged when the changes are made though — it
 would be a good input source to support.

 Matei


 On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 I'm looking to start experimenting with Spark Streaming, and I'd like to
 use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source.
 Looking at the list of supported Spark Streaming sources
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking,
 I don't see any mention of Kinesis.

 Is it possible to use Spark Streaming with Amazon Kinesis? If not, are
 there plans to add such support in the future?

 Nick


 --
 View this message in context: Spark Streaming source from Amazon Kinesis
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com
 http://nabble.com/.







Re: data locality

2014-07-22 Thread Sandy Ryza
On standalone there is still special handling for assigning tasks within
executors.  There just isn't special handling for where to place executors,
because standalone generally places an executor on every node.


On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang hw...@qilinsoft.com wrote:

Sandy,



 I just tried the standalone cluster and didn't have chance to try Yarn yet.

 So if I understand correctly, there are **no** special handling of task
 assignment according to the HDFS block's location when Spark is running as
 a **standalone** cluster.

 Please correct me if I'm wrong. Thank you for your patience!


  --

 *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
 *Sent:* 2014年7月22日 9:47

 *To:* user@spark.apache.org
 *Subject:* Re: data locality



 This currently only works for YARN.  The standalone default is to place an
 executor on every node for every job.



 The total number of executors is specified by the user.



 -Sandy



 On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote:

 Sandy,



 Do you mean the “preferred location” is working for standalone cluster
 also? Because I check the code of SparkContext and see comments as below:



   // This is used only by YARN for now, but should be relevant to other
 cluster types (*Mesos*,

   // etc) too. This is typically generated from
 InputFormatInfo.computePreferredLocations. It

   // contains a map from *hostname* to a list of input format splits on
 the host.

   *private*[spark] *var* preferredNodeLocationData: Map[String,
 Set[SplitInfo]] = Map()



 BTW, even with the preferred hosts, how does Spark decide how many total
 executors to use for this application?



 Thanks again!


  --

 *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
 *Sent:* Friday, July 18, 2014 3:44 PM
 *To:* user@spark.apache.org
 *Subject:* Re: data locality



 Hi Haopu,



 Spark will ask HDFS for file block locations and try to assign tasks based
 on these.



 There is a snag.  Spark schedules its tasks inside of executor processes
 that stick around for the lifetime of a Spark application.  Spark requests
 executors before it runs any jobs, i.e. before it has any information about
 where the input data for the jobs is located.  If the executors occupy
 significantly fewer nodes than exist in the cluster, it can be difficult
 for Spark to achieve data locality.  The workaround for this is an API that
 allows passing in a set of preferred locations when instantiating a Spark
 context.  This API is currently broken in Spark 1.0, and will likely
 changed to be something a little simpler in a future release.



 val locData = InputFormatInfo.computePreferredLocations

   (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new
 Path(“myfile.txt”)))



 val sc = new SparkContext(conf, locData)



 -Sandy





 On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I have a standalone spark cluster and a HDFS cluster which share some of
 nodes.



 When reading HDFS file, how does spark assign tasks to nodes? Will it ask
 HDFS the location for each file block in order to get a right worker node?



 How about a spark cluster on Yarn?



 Thank you very much!









Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-22 Thread Andre Schumacher

Hi,

I don't think anybody has been testing importing of Impala tables
directly. Is there any chance to export these first, say as
unpartitioned Hive tables and import these? Just an idea..

Andre

On 07/21/2014 11:46 PM, chutium wrote:
 no, something like this
 
 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2
 on 02.xxx: remote Akka client disassociated
 
 ...
 ...
 
 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186)
 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to
 java.io.IOException
 java.io.IOException: Filesystem closed
 at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
 at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
 at java.io.DataInputStream.readFully(DataInputStream.java:195)
 at java.io.DataInputStream.readFully(DataInputStream.java:169)
 at
 parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
 at
 parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 
 
 ulimit is increased
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Spark sql with hive table running on Yarn-cluster mode

2014-07-22 Thread Jenny Zhao
Hi,

For running spark sql, the dataneuclus*.jar are automatically added in
classpath, this works fine for spark standalone mode and yarn-client mode,
however, for Yarn-cluster mode, I have to explicitly put these jars using
--jars option when submitting this job, otherwise, the job will fail, why
it won't  work for yarn-cluster mode?

Thank you for your help!

Jenny


Spark app vs SparkSQL app

2014-07-22 Thread buntu
I could possible use Spark API and write an batch app to provide some per web
page stats such as views, uniques etc. The same can be achieved using
SparkSQL, so wanted to check:

* what are the best practices and pros/cons of either of the approaches?
* Does SparkSQL require registerAsTable for every batch or the table created
persists?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-app-vs-SparkSQL-app-tp10422.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why spark-submit command hangs?

2014-07-22 Thread Andrew Or
Hi Earthson,

Is your problem resolved? The way you submit your application looks alright
to me; spark-submit should be able to parse the combination of --master and
--deploy-mode correctly. I suspect you might have hard-coded yarn-cluster
or something in your application.

Andrew


2014-07-22 1:25 GMT-07:00 Earthson earthson...@gmail.com:

 That's what my problem is:)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: hadoop version

2014-07-22 Thread Andrew Or
Hi Maria,

Having files that end with 1.0.0 means you're Spark 1.0, not Hadoop 1.0.
You can check your hadoop version by running $HADOOP_HOME/bin/hadoop
version, where HADOOP_HOME is set to your installation of hadoop. On the
clusters started by the Spark ec2 scripts, this should be
/root/ephemeral-hdfs.

Andrew


2014-07-22 7:07 GMT-07:00 mrm ma...@skimlinks.com:

 Hi,

 Where can I find the version of Hadoop my cluster is using? I launched my
 ec2 cluster using the spark-ec2 script with the --hadoop-major-version=2
 option. However, the folder hadoop-native/lib in the master node only
 contains files that end in 1.0.0. Does that mean that I have Hadoop version
 1?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: spark streaming rate limiting from kafka

2014-07-22 Thread Bill Jay
Hi Tobias,

I tried to use 10 as numPartition. The number of executors allocated is the
number of DStream. Therefore, it seems the parameter does not spread data
into many partitions. In order to to that, it seems we have to do
repartition. If numPartitions will distribute the data to multiple
executors/partitions, then I will be able to save the running time incurred
by repartition.

Bill




On Mon, Jul 21, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 numPartitions means the number of Spark partitions that the data received
 from Kafka will be split to. It has nothing to do with Kafka partitions, as
 far as I know.

 If you create multiple Kafka consumers, it doesn't seem like you can
 specify which consumer will consume which Kafka partitions. Instead, Kafka
 (at least with the interface that is exposed by the Spark Streaming API)
 will do something called rebalance and assign Kafka partitions to consumers
 evenly, you can see this in the client logs.

 When using multiple Kafka consumers with auto.offset.reset = true, please
 expect to run into this one:
 https://issues.apache.org/jira/browse/SPARK-2383

 Tobias


 On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I am currentlycreating multiple DStream to consumefrom different topics.
 How can I let each consumer consume from different partitions. I find the
 following parameters from Spark API:

 createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc:
 JavaStreamingContext
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html
 , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class
 [U], valueDecoderClass: Class[T], kafkaParams: Map[String, String],
 topics: Map[String, Integer],storageLevel: StorageLevel
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html
 ): JavaPairReceiverInputDStream
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html
 [K, V]

 Create an input stream that pulls messages form a Kafka Broker.




 The topics parameter is:
 *Map of (topic_name - numPartitions) to consume. Each partition is
 consumed in its own thread*

 Does numPartitions mean the total number of partitions to consume from
 topic_name or the index of the partition? How can we specify for each
 createStream which partition of the Kafka topic to consume? I think if so,
 I will get a lot of parallelism from the source of the data. Thanks!

 Bill

 On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can create multiple kafka stream to partition your topics across
 them, which will run multiple receivers or multiple executors. This is
 covered in the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song









Re: Spark job tracker.

2014-07-22 Thread Marcelo Vanzin
I don't understand what you're trying to do.

The code will use log4j under the covers. The default configuration
means writing log messages to stderr. In yarn-client mode that is your
terminal screen, in yarn-cluster mode that is redirected to a file by
Yarn. For the executors, that will always be redirected to a file
(since they're launched by yarn).

I don't know what you mean by port. But if neither of those options
is what you're looking for, you need to look at providing a custom
log4j configuration that does what you want.


On Sun, Jul 20, 2014 at 11:05 PM, abhiguruvayya
sharath.abhis...@gmail.com wrote:
 Hello Marcelo Vanzin,

 Can you explain bit more on this? I tried using client mode but can you
 explain how can i use this port to write the log or output to this
 port?Thanks in advance!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10287.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


combineByKey at ShuffledDStream.scala

2014-07-22 Thread Bill Jay
Hi all,

I am currently running a Spark Streaming program, which consumes data from
Kakfa and does the group by operation on the data. I try to optimize the
running time of the program because it looks slow to me. It seems the stage
named:

* combineByKey at ShuffledDStream.scala:42 *

always takes the longest running time. And If I open this stage, I only see
two executors on this stage. Does anyone has an idea what this stage does
and how to increase the speed for this stage? Thanks!

Bill


Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-22 Thread Sandy Ryza
I haven't had a chance to look at the details of this issue, but we have
seen Spark successfully read Parquet tables created by Impala.


On Tue, Jul 22, 2014 at 10:10 AM, Andre Schumacher andre.sc...@gmail.com
wrote:


 Hi,

 I don't think anybody has been testing importing of Impala tables
 directly. Is there any chance to export these first, say as
 unpartitioned Hive tables and import these? Just an idea..

 Andre

 On 07/21/2014 11:46 PM, chutium wrote:
  no, something like this
 
  14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost
 executor 2
  on 02.xxx: remote Akka client disassociated
 
  ...
  ...
 
  14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task
 1.2:186)
  14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to
  java.io.IOException
  java.io.IOException: Filesystem closed
  at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
  at
 
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
  at
  org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
  at java.io.DataInputStream.readFully(DataInputStream.java:195)
  at java.io.DataInputStream.readFully(DataInputStream.java:169)
  at
 
 parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
  at
 
 parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
  at
 
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
  at
 
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
  at
 
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
  at
  org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
  at
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
  at
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.Task.run(Task.scala:51)
  at
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:744)
 
 
  ulimit is increased
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 




Need info on log4j.properties for apache spark.

2014-07-22 Thread abhiguruvayya
Hello All,

Basically i need to edit the log4j.properties to filter some of the
unnecessary logs in spark on yarn-client mode. I am not sure where can i
find log4j.properties file (location). Can any one help me on this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-info-on-log4j-properties-for-apache-spark-tp10431.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark job tracker.

2014-07-22 Thread abhiguruvayya
I fixed the error with the yarn-client mode issue which i mentioned in my
earlier post. Now i want to edit the log4j.properties to filter some of the
unnecessary logs. Can you let me know where can i find this properties file.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10433.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming: no job has started yet

2014-07-22 Thread Bill Jay
Hi all,

I am running a spark streaming job. The job hangs on one stage, which shows
as follows:

Details for Stage 4
Summary MetricsNo tasks have started yetTasksNo tasks have started yet



Does anyone have an idea on this?

Thanks!

Bill
Bill


What if there are large, read-only variables shared by all map functions?

2014-07-22 Thread Parthus
Hi there,

I was wondering if anybody could help me find an efficient way to make a
MapReduce program like this:

1) For each map function, it need access some huge files, which is around
6GB

2) These files are READ-ONLY. Actually they are like some huge look-up
table, which will not change during 2~3 years.

I tried two ways to make the program work, but neither of them is efficient:

1) The first approach I tried is to let each map function load those files
independently, like this:

map (...) { load(files); DoMapTask(...)}

2) The second approach I tried is to load the files before RDD.map(...) and
broadcast the files. However, because the files are too large, the
broadcasting overhead is 30min ~ 1 hour.

Could anybody help me find an efficient way to solve it?

Thanks very much.










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Very wierd behavior

2014-07-22 Thread Matei Zaharia
Is the first() being computed locally on the driver program? Maybe it's to hard 
to compute with the memory, etc available there. Take a look at the driver's 
log and see whether it has the message Computing the requested partition 
locally. 

Matei

On Jul 22, 2014, at 12:04 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com 
wrote:

 I was wondering if anyone could provide an explanation for the behavior I'm 
 seeing.
 
 I have an RDD, call it foo, not too complex, with a maybe 8 level deep DAG 
 with 2 shuffles, not empty, not even terribly big - small enough that some 
 partitions could be empty.
 
 When I run foo.first, I get workers disconnecting, and applications die
 When I run foo.mapPartitions.saveAsHadoopDataset, it works fine.
 
 Anyone got an explanation for why that might be?
 
 -Thanks, Nathan
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



RE: Hive From Spark

2014-07-22 Thread Andrew Lee
Hi Sean,
Thanks for clarifying. I re-read SPARK-2420 and now have a better understanding.
From a user perspective, what would you recommend to build Spark with Hive 
0.12 / 0.13+ libraries moving forward and deploy to production cluster that 
runs on a older version of Hadoop (e.g. 2.2 or 2.4) ?
My concern is that there's going to be a lag for technology adoption and since 
Spark is moving fast, the libraries may always be newer. Protobuf is one good 
example, shading. From a biz point of view, if there is no benefit to upgrade 
the library, the chances that this will happen with a higher priority is low 
due to stability concern and re-running the entire test suite. Just by 
observation, there's still a lot of ppl running Hadoop 2.2 instead of 2.4 or 
2.5 and the release and upgrade is depending on other big players such as 
Cloudera, Hortonwork, etc for their distro. Not to mention the process of 
upgrading.
Is there any benefit to use Guava 14 in Spark? I believe there is usually some 
competitive reason why Spark choose Guava 14, however, I'm not sure if anyone 
raise that in the conversation so I don't know if that is necessary.
Looking forward to seeing Hive on Spark to work soon. Please let me know if 
there's any help or feedback I can provide.
Thanks Sean.


 From: so...@cloudera.com
 Date: Mon, 21 Jul 2014 18:36:10 +0100
 Subject: Re: Hive From Spark
 To: user@spark.apache.org
 
 I haven't seen anyone actively 'unwilling' -- I hope not. See
 discussion at https://issues.apache.org/jira/browse/SPARK-2420 where I
 sketch what a downgrade means. I think it just hasn't gotten a looking
 over.
 
 Contrary to what I thought earlier, the conflict does in fact cause
 problems in theory, and you show it causes a problem in practice. Not
 to mention it causes issues for Hive-on-Spark now.
 
 On Mon, Jul 21, 2014 at 6:27 PM, Andrew Lee alee...@hotmail.com wrote:
  Hive and Hadoop are using an older version of guava libraries (11.0.1) where
  Spark Hive is using guava 14.0.1+.
  The community isn't willing to downgrade to 11.0.1 which is the current
  version for Hadoop 2.2 and Hive 0.12.
  

Re: Spark job tracker.

2014-07-22 Thread Marcelo Vanzin
You can upload your own log4j.properties using spark-submit's
--files argument.

On Tue, Jul 22, 2014 at 12:45 PM, abhiguruvayya
sharath.abhis...@gmail.com wrote:
 I fixed the error with the yarn-client mode issue which i mentioned in my
 earlier post. Now i want to edit the log4j.properties to filter some of the
 unnecessary logs. Can you let me know where can i find this properties file.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10433.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


RE: Tranforming flume events using Spark transformation functions

2014-07-22 Thread Sundaram, Muthu X.
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and 
call are not at all executed. I might be doing this in a wrong way. Any help 
would be appreciated.

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
System.out.println(Inside for 
each...call);

JavaRDDString records = eventsData.map(
new FunctionSparkFlumeEvent, String() {
@Override
public String call(SparkFlumeEvent flume) throws Exception {
String logRecord = null;
AvroFlumeEvent avroEvent = null;
  ByteBuffer bytePayload = null;


System.out.println(Inside Map..call);
/* ListSparkFlumeEvent events = flume.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator(); 

SparkFlumeEvent flumeEvent = batchedEvents.next();*/
avroEvent = flume.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());


System.out.println(Record is + logRecord);

return logRecord;
}
});   
return null;
}

-Original Message-
From: Sundaram, Muthu X. [mailto:muthu.x.sundaram@sabre.com] 
Sent: Tuesday, July 22, 2014 10:24 AM
To: user@spark.apache.org; d...@spark.incubator.apache.org
Subject: Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these 
function directly on this?

I need to do following kind of operations

 AA
YDelta
TAA
 Southwest
 AA

Unique tickets are  , Y, , , .
Count is  2,  1, T 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDDString.

Can I create new JavaRDDString? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
  @Override
  public Void call(JavaRDDSparkFlumeEvent eventsData) throws 
Exception {
 String logRecord = null;
 ListSparkFlumeEvent events = eventsData.collect();
 IteratorSparkFlumeEvent batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println(LOG RECORD =  + 
logRecord); }

Where do I create new JavaRDDString? DO I do it before this loop? How do I 
create this JavaRDDString?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu




Re: Spark job tracker.

2014-07-22 Thread abhiguruvayya
Thanks i am able to load the file now. Can i turn off specific logs using
log4j.properties. I don't want to see the below logs. How can i do this.

14/07/22 14:01:24 INFO scheduler.TaskSetManager: Starting task 2.0:129 as
TID 129 on executor 3: ** (NODE_LOCAL)
14/07/22 14:01:24 INFO scheduler.TaskSetManager: Serialized task 2.0:129 as
14708 bytes in 0 ms

*current log4j.properties entry:*

# make a file appender and a console appender
# Print the date in ISO 8601 format
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n
log4j.appender.myFileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.myFileAppender.File=spark.log
log4j.appender.myFileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myFileAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n



# By default, everything goes to console and file
log4j.rootLogger=INFO, myConsoleAppender, myFileAppender

# The noisier spark logs go to file only
log4j.logger.spark.storage=INFO, myFileAppender
log4j.additivity.spark.storage=false
log4j.logger.spark.scheduler=INFO, myFileAppender
log4j.additivity.spark.scheduler=false
log4j.logger.spark.CacheTracker=INFO, myFileAppender
log4j.additivity.spark.CacheTracker=false
log4j.logger.spark.CacheTrackerActor=INFO, myFileAppender
log4j.additivity.spark.CacheTrackerActor=false
log4j.logger.spark.MapOutputTrackerActor=INFO, myFileAppender
log4j.additivity.spark.MapOutputTrackerActor=false
log4j.logger.spark.MapOutputTracker=INFO, myFileAppender
log4j.additivty.spark.MapOutputTracker=false



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10440.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Apache kafka + spark + Parquet

2014-07-22 Thread buntu
 Now we are storing Data direct from Kafka to Parquet.

We are currently using Camus and wanted to know how you went about storing
to Parquet?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-kafka-spark-Parquet-tp10037p10441.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark job tracker.

2014-07-22 Thread Marcelo Vanzin
The spark log classes are based on the actual class names. So if you
want to filter out a package's logs you need to specify the full
package name (e.g. org.apache.spark.storage instead of just
spark.storage).

On Tue, Jul 22, 2014 at 2:07 PM, abhiguruvayya
sharath.abhis...@gmail.com wrote:
 Thanks i am able to load the file now. Can i turn off specific logs using
 log4j.properties. I don't want to see the below logs. How can i do this.

 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Starting task 2.0:129 as
 TID 129 on executor 3: ** (NODE_LOCAL)
 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Serialized task 2.0:129 as
 14708 bytes in 0 ms

 *current log4j.properties entry:*

 # make a file appender and a console appender
 # Print the date in ISO 8601 format
 log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
 log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
 %m%n
 log4j.appender.myFileAppender=org.apache.log4j.RollingFileAppender
 log4j.appender.myFileAppender.File=spark.log
 log4j.appender.myFileAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.myFileAppender.layout.ConversionPattern=%d [%t] %-5p %c -
 %m%n



 # By default, everything goes to console and file
 log4j.rootLogger=INFO, myConsoleAppender, myFileAppender

 # The noisier spark logs go to file only
 log4j.logger.spark.storage=INFO, myFileAppender
 log4j.additivity.spark.storage=false
 log4j.logger.spark.scheduler=INFO, myFileAppender
 log4j.additivity.spark.scheduler=false
 log4j.logger.spark.CacheTracker=INFO, myFileAppender
 log4j.additivity.spark.CacheTracker=false
 log4j.logger.spark.CacheTrackerActor=INFO, myFileAppender
 log4j.additivity.spark.CacheTrackerActor=false
 log4j.logger.spark.MapOutputTrackerActor=INFO, myFileAppender
 log4j.additivity.spark.MapOutputTrackerActor=false
 log4j.logger.spark.MapOutputTracker=INFO, myFileAppender
 log4j.additivty.spark.MapOutputTracker=false



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10440.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
Hi guys,

I'm able to run some Spark SQL example but the sql is static in the code. I
would like to know is there a way to read sql from somewhere else (shell
for example)

I could read sql statement from kafka/zookeeper, but I cannot share the sql
to all workers. broadcast seems not working for updating values.

Moreover if I use some non-serializable class(DataInputStream etc) to read
sql from other source, I always get Task not serializable:
java.io.NotSerializableException


Best,
Siyuan


Re: How to do an interactive Spark SQL

2014-07-22 Thread Zongheng Yang
Do you mean that the texts of the SQL queries being hardcoded in the
code? What do you mean by cannot shar the sql to all workers?

On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote:
 Hi guys,

 I'm able to run some Spark SQL example but the sql is static in the code. I
 would like to know is there a way to read sql from somewhere else (shell for
 example)

 I could read sql statement from kafka/zookeeper, but I cannot share the sql
 to all workers. broadcast seems not working for updating values.

 Moreover if I use some non-serializable class(DataInputStream etc) to read
 sql from other source, I always get Task not serializable:
 java.io.NotSerializableException


 Best,
 Siyuan


Spark clustered client

2014-07-22 Thread Asaf Lahav
Hi Folks,

I have been trying to dig up some information in regards to what are the
possibilities when wanting to deploy more than one client process that
consumes Spark.

Let's say I have a Spark Cluster of 10 servers, and would like to setup 2
additional servers which are sending requests to it through a Spark
context, referencing one specific file of 1TB of data.

Each client process, has its own SparkContext instance.
Currently, the result is that that same file is loaded into memory twice
because the Spark Context resources are not shared between processes/jvms.


I wouldn't like to have that same file loaded over and over again with
every new client being introduced.
What would be the best practice here? Am I missing something?

Thank you,
Asaf


Re: How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how
do I broadcast the sql to all workers that is doing sql analysis.

Best,
Siyuan


On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote:

 Do you mean that the texts of the SQL queries being hardcoded in the
 code? What do you mean by cannot shar the sql to all workers?

 On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:
  Hi guys,
 
  I'm able to run some Spark SQL example but the sql is static in the
 code. I
  would like to know is there a way to read sql from somewhere else (shell
 for
  example)
 
  I could read sql statement from kafka/zookeeper, but I cannot share the
 sql
  to all workers. broadcast seems not working for updating values.
 
  Moreover if I use some non-serializable class(DataInputStream etc) to
 read
  sql from other source, I always get Task not serializable:
  java.io.NotSerializableException
 
 
  Best,
  Siyuan



Re: How to do an interactive Spark SQL

2014-07-22 Thread Zongheng Yang
Can you paste a small code example to illustrate your questions?

On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote:
 Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how
 do I broadcast the sql to all workers that is doing sql analysis.

 Best,
 Siyuan


 On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote:

 Do you mean that the texts of the SQL queries being hardcoded in the
 code? What do you mean by cannot shar the sql to all workers?

 On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:
  Hi guys,
 
  I'm able to run some Spark SQL example but the sql is static in the
  code. I
  would like to know is there a way to read sql from somewhere else (shell
  for
  example)
 
  I could read sql statement from kafka/zookeeper, but I cannot share the
  sql
  to all workers. broadcast seems not working for updating values.
 
  Moreover if I use some non-serializable class(DataInputStream etc) to
  read
  sql from other source, I always get Task not serializable:
  java.io.NotSerializableException
 
 
  Best,
  Siyuan




Re: the implications of some items in webUI

2014-07-22 Thread Ankur Dave
On Tue, Jul 22, 2014 at 7:08 AM, Yifan LI iamyifa...@gmail.com wrote:

 1) what is the difference between Duration(Stages - Completed Stages)
 and Task Time(Executors) ?


Stages are composed of tasks that run on executors. Tasks within a stage
may run concurrently, since there are multiple executors and each executor
may run more than one task at a time.

 An executor's task time is the sum of the durations of all of its tasks.
Because this is a simple sum, it does not take parallelism into account: if
an executor runs 8 tasks concurrently and each takes a minute, it has only
spent one minute of wallclock time, but the reported task time will be 8
minutes.

A stage's duration is how much wallclock time elapsed between when the
first task launched and when the last task finished. This does take
parallelism into account, so in the above example the stage duration would
be 1 minute.

2) what are the exact meanings of Shuffle Read/Shuffle Write?


Stages communicate using shuffles. Each task may start by reading shuffle
inputs across the network, and may end by writing shuffle outputs to disk
locally. See page 7 of the Spark NSDI paper
https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf
for details.

Shuffle read and shuffle write refer to the total amount of data that a
stage read across the network and wrote to disk.

Ankur http://www.ankurdave.com/


RE: Joining by timestamp.

2014-07-22 Thread durga
Thanks Chen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10449.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How could I start new spark cluster with hadoop2.0.2

2014-07-22 Thread durga
Hi,

I am trying to create spark cluster using spark-ec2 file under spark1.0.1
directory.

1) I noticed that It is always creating hadoop version 1.0.4.Is there a way
I can override that?I would like to have hadoop2.0.2

2) I also wants install Oozie along with. Is there any scrips available
along with spark-ec2, which can create oozie instances for me.

Thanks,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
For example, this is what I tested and work on local mode, what it does is
it get data and sql query both from kafka and do sql on each RDD and output
the result back to kafka again
I defined a var called *sqlS. * In the streaming part as you can see I
change the sql statement if it consumes a sql message from kafka then next
time when you do *sql(sqlS) *it execute the updated sql query.

But this code doesn't work in cluster because sqlS is not updated on all
the workers from what I understand.

So my question is how do I change the sqlS value at runtime and make all
the workers pick the latest value.


*var sqlS = select count(*) from records*
val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
args
val sparkConf = new SparkConf().setAppName(KafkaSpark)
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)

// Importing the SQL context gives access to all the SQL functions and
implicit conversions.
import sqlContext._
import sqlContext.createSchemaRDD

//val tt = Time(5000)
val topicpMap = collection.immutable.HashMap(topic - numParts.toInt,
sqltopic - 2)
val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS =
t._2;* false } else true }).map(t = getRecord(t._2.split(#)))

val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer)

val brokerString =
ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,)

KafkaSpark.props.put(metadata.broker.list, brokerString)
val config = new ProducerConfig(KafkaSpark.props)
val producer = new Producer[String, String](config)

val result = recordsStream.foreachRDD((recRDD) = {
  val schemaRDD = sqlContext.createSchemaRDD(recRDD)
  schemaRDD.registerAsTable(tName)
  val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = { s
+ r.mkString(,) + \n })
  producer.send(new KeyedMessage[String, String](outputTopic, sSQL:
$sqlS \n $result))
})
ssc.start()
ssc.awaitTermination()


On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com wrote:

 Can you paste a small code example to illustrate your questions?

 On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:
  Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
 how
  do I broadcast the sql to all workers that is doing sql analysis.
 
  Best,
  Siyuan
 
 
  On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com
 wrote:
 
  Do you mean that the texts of the SQL queries being hardcoded in the
  code? What do you mean by cannot shar the sql to all workers?
 
  On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
   Hi guys,
  
   I'm able to run some Spark SQL example but the sql is static in the
   code. I
   would like to know is there a way to read sql from somewhere else
 (shell
   for
   example)
  
   I could read sql statement from kafka/zookeeper, but I cannot share
 the
   sql
   to all workers. broadcast seems not working for updating values.
  
   Moreover if I use some non-serializable class(DataInputStream etc) to
   read
   sql from other source, I always get Task not serializable:
   java.io.NotSerializableException
  
  
   Best,
   Siyuan
 
 



Re: How to do an interactive Spark SQL

2014-07-22 Thread Tobias Pfeiffer
Hi,

as far as I know, after the Streaming Context has started, the processing
pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
statement is transformed into RDD operations when the Streaming Context
starts, I think there is no way to change the statement that is executed on
the current stream after the StreamingContext has started.

Tobias


On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com wrote:

 For example, this is what I tested and work on local mode, what it does is
 it get data and sql query both from kafka and do sql on each RDD and output
 the result back to kafka again
 I defined a var called *sqlS. * In the streaming part as you can see I
 change the sql statement if it consumes a sql message from kafka then next
 time when you do *sql(sqlS) *it execute the updated sql query.

 But this code doesn't work in cluster because sqlS is not updated on all
 the workers from what I understand.

 So my question is how do I change the sqlS value at runtime and make all
 the workers pick the latest value.


 *var sqlS = select count(*) from records*
 val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
 args
 val sparkConf = new SparkConf().setAppName(KafkaSpark)
 val sc = new SparkContext(sparkConf)
 val ssc = new StreamingContext(sc, Seconds(2))
 val sqlContext = new SQLContext(sc)

 // Importing the SQL context gives access to all the SQL functions and
 implicit conversions.
 import sqlContext._
 import sqlContext.createSchemaRDD

 //val tt = Time(5000)
 val topicpMap = collection.immutable.HashMap(topic - numParts.toInt,
 sqltopic - 2)
 val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS =
 t._2;* false } else true }).map(t = getRecord(t._2.split(#)))

 val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer)

 val brokerString =
 ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,)

 KafkaSpark.props.put(metadata.broker.list, brokerString)
 val config = new ProducerConfig(KafkaSpark.props)
 val producer = new Producer[String, String](config)

 val result = recordsStream.foreachRDD((recRDD) = {
   val schemaRDD = sqlContext.createSchemaRDD(recRDD)
   schemaRDD.registerAsTable(tName)
   val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = {
 s + r.mkString(,) + \n })
   producer.send(new KeyedMessage[String, String](outputTopic, sSQL:
 $sqlS \n $result))
 })
 ssc.start()
 ssc.awaitTermination()


 On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com
 wrote:

 Can you paste a small code example to illustrate your questions?

 On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:
  Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
 how
  do I broadcast the sql to all workers that is doing sql analysis.
 
  Best,
  Siyuan
 
 
  On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com
 wrote:
 
  Do you mean that the texts of the SQL queries being hardcoded in the
  code? What do you mean by cannot shar the sql to all workers?
 
  On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
   Hi guys,
  
   I'm able to run some Spark SQL example but the sql is static in the
   code. I
   would like to know is there a way to read sql from somewhere else
 (shell
   for
   example)
  
   I could read sql statement from kafka/zookeeper, but I cannot share
 the
   sql
   to all workers. broadcast seems not working for updating values.
  
   Moreover if I use some non-serializable class(DataInputStream etc) to
   read
   sql from other source, I always get Task not serializable:
   java.io.NotSerializableException
  
  
   Best,
   Siyuan
 
 





streaming window not behaving as advertised (v1.0.1)

2014-07-22 Thread Alan Ngai
I have a sample application pumping out records 1 per second.  The batch 
interval is set to 5 seconds.  Here’s a list of “observed window intervals” vs 
what was actually set

window=25, slide=25 : observed-window=25, overlapped-batches=0
window=25, slide=20 : observed-window=20, overlapped-batches=0
window=25, slide=15 : observed-window=15, overlapped-batches=0
window=25, slide=10 : observed-window=20, overlapped-batches=2
window=25, slide=5 : observed-window=25, overlapped-batches=3

can someone explain this behavior to me?  I’m trying to aggregate metrics by 
time batches, but want to skip partial batches.  Therefore, I’m trying to find 
a combination which results in 1 overlapped batch, but no combination I tried 
gets me there.  

Alan



Re: How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
But how do they do the interactive sql in the demo?
https://www.youtube.com/watch?v=dJQ5lV5Tldw

And if it can work in the local mode. I think it should be able to work in
cluster mode, correct?


On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 as far as I know, after the Streaming Context has started, the processing
 pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
 statement is transformed into RDD operations when the Streaming Context
 starts, I think there is no way to change the statement that is executed on
 the current stream after the StreamingContext has started.

 Tobias


 On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com
 wrote:

 For example, this is what I tested and work on local mode, what it does
 is it get data and sql query both from kafka and do sql on each RDD and
 output the result back to kafka again
 I defined a var called *sqlS. * In the streaming part as you can see I
 change the sql statement if it consumes a sql message from kafka then next
 time when you do *sql(sqlS) *it execute the updated sql query.

 But this code doesn't work in cluster because sqlS is not updated on all
 the workers from what I understand.

 So my question is how do I change the sqlS value at runtime and make all
 the workers pick the latest value.


 *var sqlS = select count(*) from records*
 val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
 args
 val sparkConf = new SparkConf().setAppName(KafkaSpark)
 val sc = new SparkContext(sparkConf)
 val ssc = new StreamingContext(sc, Seconds(2))
 val sqlContext = new SQLContext(sc)

 // Importing the SQL context gives access to all the SQL functions
 and implicit conversions.
 import sqlContext._
 import sqlContext.createSchemaRDD

 //val tt = Time(5000)
 val topicpMap = collection.immutable.HashMap(topic - numParts.toInt,
 sqltopic - 2)
 val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS =
 t._2;* false } else true }).map(t = getRecord(t._2.split(#)))

 val zkClient = new ZkClient(zkQuorum, 3, 3,
 ZKStringSerializer)

 val brokerString =
 ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,)

 KafkaSpark.props.put(metadata.broker.list, brokerString)
 val config = new ProducerConfig(KafkaSpark.props)
 val producer = new Producer[String, String](config)

 val result = recordsStream.foreachRDD((recRDD) = {
   val schemaRDD = sqlContext.createSchemaRDD(recRDD)
   schemaRDD.registerAsTable(tName)
   val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = {
 s + r.mkString(,) + \n })
   producer.send(new KeyedMessage[String, String](outputTopic, sSQL:
 $sqlS \n $result))
 })
 ssc.start()
 ssc.awaitTermination()


 On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com
 wrote:

 Can you paste a small code example to illustrate your questions?

 On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:
  Sorry, typo. What I mean is sharing. If the sql is changing at
 runtime, how
  do I broadcast the sql to all workers that is doing sql analysis.
 
  Best,
  Siyuan
 
 
  On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com
 wrote:
 
  Do you mean that the texts of the SQL queries being hardcoded in the
  code? What do you mean by cannot shar the sql to all workers?
 
  On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
   Hi guys,
  
   I'm able to run some Spark SQL example but the sql is static in the
   code. I
   would like to know is there a way to read sql from somewhere else
 (shell
   for
   example)
  
   I could read sql statement from kafka/zookeeper, but I cannot share
 the
   sql
   to all workers. broadcast seems not working for updating values.
  
   Moreover if I use some non-serializable class(DataInputStream etc)
 to
   read
   sql from other source, I always get Task not serializable:
   java.io.NotSerializableException
  
  
   Best,
   Siyuan
 
 






Where is the PowerGraph abstraction

2014-07-22 Thread shijiaxin
I download the spark 1.0.1, but I cannot find the PowerGraph abstraction
mentioned in the GraphX paper.
What I can find is the pregel abstraction.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-is-the-PowerGraph-abstraction-tp10457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming source from Amazon Kinesis

2014-07-22 Thread Tathagata Das
I will take a look at it tomorrow!

TD


On Tue, Jul 22, 2014 at 9:30 AM, Chris Fregly ch...@fregly.com wrote:

 i took this over from parviz.

 i recently submitted a new PR for Kinesis Spark Streaming support:
 https://github.com/apache/spark/pull/1434

 others have tested it with good success, so give it a whirl!

 waiting for it to be reviewed/merged.  please put any feedback into the PR
 directly.

 thanks!

 -chris


 On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 No worries, looking forward to it!

 Matei

 On Apr 21, 2014, at 1:59 PM, Parviz Deyhim pdey...@gmail.com wrote:

 sorry Matei. Will definitely start working on making the changes soon :)


 On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 There was a patch posted a few weeks ago (
 https://github.com/apache/spark/pull/223), but it needs a few changes
 in packaging because it uses a license that isn’t fully compatible with
 Apache. I’d like to get this merged when the changes are made though — it
 would be a good input source to support.

 Matei


 On Apr 21, 2014, at 1:00 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 I'm looking to start experimenting with Spark Streaming, and I'd like to
 use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source.
 Looking at the list of supported Spark Streaming sources
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking,
 I don't see any mention of Kinesis.

 Is it possible to use Spark Streaming with Amazon Kinesis? If not, are
 there plans to add such support in the future?

 Nick


 --
 View this message in context: Spark Streaming source from Amazon Kinesis
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com
 http://nabble.com/.








Re: combineByKey at ShuffledDStream.scala

2014-07-22 Thread Tathagata Das
Can you give an idea of the streaming program? Rest of the transformation
you are doing on the input streams?


On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi all,

 I am currently running a Spark Streaming program, which consumes data from
 Kakfa and does the group by operation on the data. I try to optimize the
 running time of the program because it looks slow to me. It seems the stage
 named:

 * combineByKey at ShuffledDStream.scala:42 *

 always takes the longest running time. And If I open this stage, I only
 see two executors on this stage. Does anyone has an idea what this stage
 does and how to increase the speed for this stage? Thanks!

 Bill



Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-22 Thread Yin Huai
It is caused by a bug in Spark REPL. I still do not know which part of the
REPL code causes it... I think people working REPL may have better idea.

Regarding how I found it, based on exception, it seems we pulled in some
irrelevant stuff and that import was pretty suspicious.

Thanks,

Yin


On Tue, Jul 22, 2014 at 12:53 AM, Victor Sheng victorsheng...@gmail.com
wrote:

 Hi, Yin Huai
 I test again with your snippet code.
 It works well in spark-1.0.1

 Here is my code:

  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  case class Record(data_date: String, mobile: String, create_time: String)
  val mobile = Record(2014-07-20,1234567,2014-07-19)
  val lm = List(mobile)
  val mobileRDD = sc.makeRDD(lm)
  val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD)
  mobileSchemaRDD.registerAsTable(mobile)
  sqlContext.sql(select count(1) from mobile).collect()

 The Result is like below:
 14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at
 SparkPlan.scala:52, took 0.296864832 s
 res9: Array[org.apache.spark.sql.Row] = Array([1])


But what is the main cause of this exception? And how you find it out by
 looking some unknown characters like $line11.$read$
 $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ?

 Thanks,
 Victor




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: streaming window not behaving as advertised (v1.0.1)

2014-07-22 Thread Tathagata Das
It could be related to this bug that is currently open.
https://issues.apache.org/jira/browse/SPARK-1312

Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and
try these combos again?

TD


On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote:

 I have a sample application pumping out records 1 per second.  The batch
 interval is set to 5 seconds.  Here’s a list of “observed window intervals”
 vs what was actually set

 window=25, slide=25 : observed-window=25, overlapped-batches=0
 window=25, slide=20 : observed-window=20, overlapped-batches=0
 window=25, slide=15 : observed-window=15, overlapped-batches=0
 window=25, slide=10 : observed-window=20, overlapped-batches=2
 window=25, slide=5 : observed-window=25, overlapped-batches=3

 can someone explain this behavior to me?  I’m trying to aggregate metrics
 by time batches, but want to skip partial batches.  Therefore, I’m trying
 to find a combination which results in 1 overlapped batch, but no
 combination I tried gets me there.

 Alan




Re: Tranforming flume events using Spark transformation functions

2014-07-22 Thread Tathagata Das
This is because of the RDD's lazy evaluation! Unless you force a
transformed (mapped/filtered/etc.) RDD to give you back some data (like
RDD.count) or output the data (like RDD.saveAsTextFile()), Spark will not
do anything.

So after the eventData.map(...), if you do take(10) and then print the
result, you should seem 10 items from each batch be printed.

Also you can do the same map operation on the Dstream as well. FYI.

inputDStream.map(...).foreachRDD(...) is equivalent to
 inputDStream.foreachRDD( // call rdd.map(...) )

Either way you have to call some RDD action (count, collect, take,
saveAsHadoopFile, etc.)  that asks the system to something concrete with
the data.

TD




On Tue, Jul 22, 2014 at 1:55 PM, Sundaram, Muthu X. 
muthu.x.sundaram@sabre.com wrote:

 I tried to map SparkFlumeEvents to String of RDDs like below. But that map
 and call are not at all executed. I might be doing this in a wrong way. Any
 help would be appreciated.

 flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
   @Override
   public Void call(JavaRDDSparkFlumeEvent eventsData) throws
 Exception {
 System.out.println(Inside for
 each...call);

 JavaRDDString records = eventsData.map(
 new FunctionSparkFlumeEvent, String() {
 @Override
 public String call(SparkFlumeEvent flume) throws Exception
 {
 String logRecord = null;
 AvroFlumeEvent avroEvent = null;
   ByteBuffer bytePayload = null;


   System.out.println(Inside Map..call);
 /* ListSparkFlumeEvent events = flume.collect();
  IteratorSparkFlumeEvent batchedEvents =
 events.iterator();

 SparkFlumeEvent flumeEvent =
 batchedEvents.next();*/
 avroEvent = flume.event();
 bytePayload = avroEvent.getBody();
 logRecord = new String(bytePayload.array());

   System.out.println(Record is +
 logRecord);

 return logRecord;
 }
 });
 return null;
 }

 -Original Message-
 From: Sundaram, Muthu X. [mailto:muthu.x.sundaram@sabre.com]
 Sent: Tuesday, July 22, 2014 10:24 AM
 To: user@spark.apache.org; d...@spark.incubator.apache.org
 Subject: Tranforming flume events using Spark transformation functions

 Hi All,
   I am getting events from flume using following line.

   JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc,
 host, port);

 Each event is a delimited record. I like to use some of the transformation
 functions like map and reduce on this. Do I need to convert the
 JavaDStreamSparkFlumeEvent to JavaDStreamString or can I apply these
 function directly on this?

 I need to do following kind of operations

  AA
 YDelta
 TAA
  Southwest
  AA

 Unique tickets are  , Y, , , .
 Count is  2,  1, T 1 and so on...
 AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket,
 Southwest - 1 ticket.

 I have to do transformations like this. Right now I am able to receives
 records. But I am struggling to transform them using spark transformation
 functions since they are not of type JavaRDDString.

 Can I create new JavaRDDString? How do I create new JavaRDD?

 I loop through  the events like below

 flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {
   @Override
   public Void call(JavaRDDSparkFlumeEvent eventsData) throws
 Exception {
  String logRecord = null;
  ListSparkFlumeEvent events = eventsData.collect();
  IteratorSparkFlumeEvent batchedEvents =
 events.iterator();
  long t1 = System.currentTimeMillis();
  AvroFlumeEvent avroEvent = null;
  ByteBuffer bytePayload = null;
  // All the user level data is carried as payload in
 Flume Event
  while(batchedEvents.hasNext()) {
 SparkFlumeEvent flumeEvent =
 batchedEvents.next();
 avroEvent = flumeEvent.event();
 bytePayload = avroEvent.getBody();
 logRecord = new String(bytePayload.array());

 System.out.println(LOG RECORD =  +
 logRecord); }

 Where do I create new JavaRDDString? DO I do it before this loop? How do
 I create this JavaRDDString?
 In the loop I am able to get every record and I am able to print them.

 I appreciate any help here.

 Thanks,
 Muthu





Re: Caching issue with msg: RDD block could not be dropped from memory as it does not exist

2014-07-22 Thread rindra
Hello Andrew,

Thank you very much for your great tips. Your solution worked perfectly.

In fact, I was not aware that the right option for local mode is
--driver.memory 1g

Cheers,

Rindra


On Mon, Jul 21, 2014 at 11:23 AM, Andrew Or-2 [via Apache Spark User List] 
ml-node+s1001560n10336...@n3.nabble.com wrote:

 Hi Rindra,

 Depending on what you're doing with your groupBy, you may end up inflating
 your data quite a bit. Even if your machine has 16G, by default spark-shell
 only uses 512M, and the amount used for storing blocks is only 60% of that
 (spark.storage.memoryFraction), so this space becomes ~300M. This is still
 many multiples of the size of your dataset, but not by orders of magnitude.
 If you are running Spark 1.0+, you can increase the amount of memory used
 by spark-shell by adding --driver-memory 1g as a command line argument in
 local mode, or --executor-memory 1g in any other mode.

 (Also, it seems that you set your log level to WARN. The cause is most
 probably because the cache is not big enough, but setting the log level to
 INFO will provide you with more information on the exact sizes that are
 being used by the storage and the blocks).

 Andrew


 2014-07-19 13:01 GMT-07:00 rindra [hidden email]
 http://user/SendEmail.jtp?type=nodenode=10336i=0:

 Hi,

 I am working with a small dataset about 13Mbyte on the spark-shell. After
 doing a
 groupBy on the RDD, I wanted to cache RDD in memory but I keep getting
 these warnings:

 scala rdd.cache()
 res28: rdd.type = MappedRDD[63] at repartition at console:28


 scala rdd.count()
 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_82 could not be dropped
 from memory as it does not exist
 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_82 failed
 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_40 could not be dropped
 from memory as it does not exist
 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_40 failed
 res29: Long = 5

 It seems that I could not cache the data in memory even though my local
 machine has
 16Gb RAM and the data is only 13MB with 100 partitions size.

 How to prevent this caching issue from happening? Thanks.

 Rindra



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10336.html
  To unsubscribe from Caching issue with msg: RDD block could not be
 dropped from memory as it does not exist, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=10248code=cmluZHJhLnViY0BnbWFpbC5jb218MTAyNDh8MTYyNTM1MTg3OQ==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Executor metrics in spark application

2014-07-22 Thread Denes
As far as I understand even if I could register the custom source, there is
no way to have a cluster-wide variable to pass to it, i.e. the accumulator
can be modified by tasks, but only the driver can read it and the broadcast
value is constant.
So it seems this custom metrics/sinks fuctionality is not really thought out
by the developers. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: akka disassociated on GC

2014-07-22 Thread Makoto Yui

Hi Xiangrui,

By using your treeAggregate and broadcast patch, the evaluation has been 
processed successfully.


I expect that these patches are merged in the next major release 
(v1.1?). Without them, it would be hard to use mllib for a large dataset.


Thanks,
Makoto

(2014/07/16 15:05), Xiangrui Meng wrote:

Hi Makoto,

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:

1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
2) use treeAggregate to get the result:
https://github.com/apache/spark/pull/1110

They are still under review. Once merged, the problem should be fixed.
I will test the KDDB dataset and report back. Thanks!

Best,
Xiangrui

On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui yuin...@gmail.com wrote:

Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:


The execution was slow for more large KDD cup 2012, Track 2 dataset
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
sequential aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.



When running the above test, I got another error at the beginning of the 2nd
iteration when enabling iterations.

It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC happens on
the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot
connect to the driver. Changing akka timeout setting did not resolve the
issue.
2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
the problem.

[spark-defaults.conf]
spark.akka.frameSize 50
spark.akka.timeout   120
spark.akka.askTimeout120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses 600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto


[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K-975362K(7046784K)]
12419091K-7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K-359202K(6059072K)]
13459952K-8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)

The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log


[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to