Re: How to list all dataframes and RDDs available in current session?

2015-08-21 Thread Raghavendra Pandey
You get the list of all the persistet rdd using spark context...
On Aug 21, 2015 12:06 AM, Rishitesh Mishra rishi80.mis...@gmail.com
wrote:

 I am not sure if you can view all RDDs in a session. Tables are maintained
 in a catalogue . Hence its easier. However  you can see the DAG
 representation , which lists all the RDDs in a job , with Spark UI.
 On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote:

 Apologies

 I accidentally included Spark User DL on BCC. The actual email message is
 below.
 =


 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all
 *dataframes/RDDs* that has been created in current session. Anyone knows if
 there is any such commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all
 *dataframes/RDDs* that has been created in current session. Anyone knows if
 there is any such commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval





Re: spark streaming 1.3 kafka error

2015-08-21 Thread Shushant Arora
it comes at start of each tasks when there is new data inserted in kafka.(
data inserted is very few)
kafka topic has 300 partitions - data inserted is ~10 MB.

Tasks gets failed and it retries which succeed and after certain no of fail
tasks it kills the job.




On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 That looks like you are choking your kafka machine. Do a top on the kafka
 machines and see the workload, it may happen that you are spending too much
 time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports (note
 that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the job
 is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of view,
 post a minimal reproducible code sample that demonstrates the issue, so it
 can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka using 
 directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 
 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
 at kafka.utils.Utils$.read(Utils.scala:376)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
 at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
 at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 
 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic 
 test_hbrealtimeevents, partition 75 offsets 4701 - 4718
 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




 Thanks





Re: How can I save the RDD result as Orcfile with spark1.3?

2015-08-21 Thread dong.yajun
hi Ted,

thanks for your reply, are there any other way to do this with spark 1.3?
such as write the orcfile manually in foreachPartition method?

On Sat, Aug 22, 2015 at 12:19 PM, Ted Yu yuzhih...@gmail.com wrote:

 ORC support was added in Spark 1.4
 See SPARK-2883

 On Fri, Aug 21, 2015 at 7:36 PM, dong.yajun dongt...@gmail.com wrote:

 Hi list,

 Is there a way to save the RDD result as Orcfile in spark1.3?  due to
 some reasons we can't upgrade our spark version to 1.4 now.

 --
 *Ric Dong*





-- 
*Ric Dong*


Re: Random Forest and StringIndexer in pyspark ML Pipeline

2015-08-21 Thread Yanbo Liang
ML plans to make Machine Learning pipeline that users can make machine
learning more efficient.
It's more general to make StringIndexer chain with any kinds of Estimators.
I think we can make StringIndexer and reverse process automatic in the
future.
If you want to know your original labels, you can use IndexToString.

2015-08-11 6:56 GMT+08:00 pkphlam pkph...@gmail.com:

 Hi,

 If I understand the RandomForest model in the ML Pipeline implementation in
 the ml package correctly, I have to first run my outcome label variable
 through the StringIndexer, even if my labels are numeric. The StringIndexer
 then converts the labels into numeric indices based on frequency of the
 label.

 This could create situations where if I'm classifying binary outcomes where
 my original labels are simply 0 and 1, the StringIndexer may actually flip
 my labels such that 0s become 1s and 1s become 0s if my original 1s were
 more frequent. This transformation would then extend itself to the
 predictions. In the old mllib implementation, the RF does not require the
 labels to be changed and I could use 0/1 labels without worrying about them
 being transformed.

 I was wondering:
 1. Why is this the default implementation for the Pipeline RF? This seems
 like it could cause a lot of confusion in cases like the one I outlined
 above.
 2. Is there a way to avoid this by either controlling how the indices are
 created in StringIndexer or bypassing StringIndexer altogether?
 3. If 2 is not possible, is there an easy way to see how my original labels
 mapped onto the indices so that I can revert the predictions back to the
 original labels rather than the transformed labels? I suppose I could do
 this by counting the original labels and mapping by frequency, but it seems
 like there should be a more straightforward way to get it out of the
 StringIndexer.

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-and-StringIndexer-in-pyspark-ML-Pipeline-tp24200.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.

2015-08-21 Thread Eugene Morozov
Hi,

I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm
trying to save my data frame to parquet.
The issue I'm stuck looks like serialization tries to do pretty weird
thing: tries to write to an empty array.

The last (through stack trace) line of spark code that leads to exception
is in method SerializationDebugger.visitSerializable(o: Object, stack:
List[String]): List[String].
desc.getObjFieldValues(finalObj, objFieldValues)

The reason it does so, is because finalObj is
org.apache.spark.sql.execution.Project and objFieldValues is an empty
array! As a result there are two fields to read from the Project instance
object (happens in java.io.ObjectStreamClass), but there is an empty array
to read into.

A little bit of code with debug info:
private def visitSerializable(o: Object, stack: List[String]): List[String]
= {
val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project,
desc: org.apache.spark.sql.execution.Project
  val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0:
SparkPlan, 1: Project]
  var i = 0 //i: 0
  while (i  slotDescs.length) {
val slotDesc = slotDescs(i) //slotDesc:
org.apache.spark.sql.execution.SparkPlan
if (slotDesc.hasWriteObjectMethod) {
  // TODO: Handle classes that specify writeObject method.
} else {
  val fields: Array[ObjectStreamField] = slotDesc.getFields
//fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled]
  val objFieldValues: Array[Object] = new
Array[Object](slotDesc.getNumObjFields) //objFieldValues:
java.lang.Object[0]
  val numPrims = fields.length - objFieldValues.length //numPrims: 1
  desc.getObjFieldValues(finalObj, objFieldValues) //leads to
exception

So it looks like it gets objFieldValues array from the SparkPlan object,
but uses it to receive values from Project object.

Here is the schema of my data frame
root
 |-- Id: long (nullable = true)
 |-- explodes: struct (nullable = true)
 ||-- Identifiers: array (nullable = true)
 |||-- element: struct (containsNull = true)
 ||||-- Type: array (nullable = true)
 |||||-- element: string (containsNull = true)
 |-- Identifiers: struct (nullable = true)
 ||-- Type: array (nullable = true)
 |||-- element: string (containsNull = true)
 |-- Type2: string (nullable = true)
 |-- Type: string (nullable = true)

Actual stack trace is:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
at
com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at

Is long running Spark batch job in fine grained mode is Deprecated?

2015-08-21 Thread Akash Mishra
Hi *,

We are trying to run Spark on top of mesos using fine grained mode. While
talking to few people i came to know that running Spark job using fine
grained mode on mesos is not a good idea.

I could not find anything regarding fine grained mode getting deprecated
and also if corse grained mode is default choice for running Spark long
running batch job.

Thanks,


-- 

Regards,
Akash Mishra.


Its not our abilities that make us, but our decisions.--Albus Dumbledore


Spark streaming multi-tasking during I/O

2015-08-21 Thread Sateesh Kavuri
Hi,

My scenario goes like this:
I have an algorithm running in Spark streaming mode on a 4 core virtual
machine. Majority of the time, the algorithm does disk I/O and database
I/O. Question is, during the I/O, where the CPU is not considerably loaded,
is it possible to run any other task/thread so as to efficiently utilize
the CPU?

Note that one DStream of the algorithm runs completely on a single CPU

Thank you,
Sateesh


Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-21 Thread Michael Albert
This is something of a wild guess, but I find that when executors start 
disappearingfor no obvious reason, this is usually because the yarn 
node-managers have decided that the containers are using too much memory and 
then terminate the executors.
Unfortunately, to see evidence of this, one needs to carefully review the yarn 
node-manager logson the workers -- it doesn't seem to show up in the UI.
What I generally do is some combination of:   1) increasing executors memory 
(often also decreasing number of executors)   2) decreasing the number of cores 
per executor   3) increase the executor memory overhead.
Good luck.
-Mike
  From: Sandy Ryza sandy.r...@cloudera.com
 To: Umesh Kacha umesh.ka...@gmail.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Thursday, August 20, 2015 5:21 PM
 Subject: Re: How to avoid executor time out on yarn spark while dealing with 
large shuffle skewed data?
   
GC wouldn't necessarily result in errors - it could just be slowing down your 
job and causing the executor JVMs to stall.  If you click on a stage in the UI, 
you should end up on a page with all the metrics concerning the tasks that ran 
in that stage.  GC Time is one of these task metrics.
-Sandy


On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead 
as 3500 which seems to be good enough I believe. So you mean only GC could be 
the reason behind timeout I checked Yarn logs I did not see any GC error there. 
Please guide. Thanks much.
On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

Moving this back onto user@
Regarding GC, can you look in the web UI and see whether the GC time metric 
dominates the amount of time spent on each task (or at least the tasks that 
aren't completing)?
Also, have you tried bumping your spark.yarn.executor.memoryOverhead?  YARN may 
be killing your executors for using too much off-heap space.  You can see 
whether this is happening by looking in the Spark AM or YARN NodeManager logs.
-Sandy
On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

Hi thanks much for the response. Yes I tried default settings too 0.2 it was 
also going into timeout if it is spending time in GC then why it is not 
throwing GC error I don't see any such error. Yarn logs are not helpful at all. 
What is tungsten how do I use it? Spark is doing great I believe my job runs 
successfully and 60% tasks completes only after first executor gets lost things 
are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com 
wrote:

What sounds most likely is that you're hitting heavy garbage collection.  Did 
you hit issues when the shuffle memory fraction was at its default of 0.2?  A 
potential danger with setting the shuffle storage to 0.7 is that it allows 
shuffle objects to get into the GC old generation, which triggers more 
stop-the-world garbage collections.
Have you tried enabling Tungsten / unsafe?
Unfortunately, Spark is still not that great at dealing with heavily-skewed 
shuffle data, because its reduce-side aggregation still operates on Java 
objects instead of binary data.
-Sandy
On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set 
spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries 
executed using hiveContext.sql my data set is skewed so will be more shuffling 
I believe I don't know what's wrong spark job runs fine for almost an hour and 
when shuffle read shuffle write column in UI starts to show more than 10 gb 
executor starts to getting lost because of timeout and slowly other executor 
starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza 
sandy.r...@cloudera.com wrote:

What version of Spark are you using?  Have you set any shuffle configs?
On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote:

I have one Spark job which seems to run fine but after one hour or so
executor start getting lost because of time out something like the following
error

cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
60 seconds

and because of above error couple of chained errors starts to come like
FetchFailedException, Rpc client disassociated, Connection reset by peer,
IOException etc

Please see the following UI page I have noticed when shuffle read/write
starts to increase more than 10 GB executors starts getting lost because of
timeout. How do I clear this stacked memory of 10 GB in shuffle read/write
section I dont cache anything why Spark is not clearing those memory. Please
guide.

IMG_20150819_231418358.jpg
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg




--
View this message in context: 

RE: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread java8964
I believe spark-shell -i scriptFile is there. We also use it, at least in 
Spark 1.3.1.
dse spark will just wrap spark-shell command, underline it is just invoking 
spark-shell.
I don't know too much about the original problem though.
Yong
Date: Fri, 21 Aug 2015 18:19:49 +0800
Subject: Re: Transformation not happening for reduceByKey or GroupByKey
From: zjf...@gmail.com
To: jsatishchan...@gmail.com
CC: robin.e...@xense.co.uk; user@spark.apache.org

Hi Satish,
I don't see where spark support -i, so suspect it is provided by DSE. In that 
case, it might be bug of DSE.


On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com 
wrote:
HI Robin,Yes, it is DSE but issue is related to Spark only
Regards,Satish Chandra
On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:
Not sure, never used dse - it’s part of DataStax Enterprise right?
On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote:
HI Robin,Yes, below mentioned piece or code works fine in Spark Shell but the 
same when place in Script File and executed with -i file name it creating an 
empty RDD
scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: 
org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at 
console:28

scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = 
Array((0,3), (1,50), (2,40))
Command:
dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

I understand, I am missing something here due to which my final RDD does not 
have as required output
Regards,Satish Chandra
On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote:
This works for me:
scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: 
org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at 
console:28

scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = 
Array((0,3), (1,50), (2,40))
On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote:
HI All,I have data in RDD as mentioned below:
RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))

I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
Values for each key
Code:RDD.reduceByKey((x,y) = x+y)RDD.take(3)
Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
console:73res:Array[(Int,Int)] = Array()
Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

Please let me know what is missing in my code, as my resultant Array is empty



Regards,Satish









-- 
Best Regards

Jeff Zhang
  

Re: SPARK sql :Need JSON back isntead of roq

2015-08-21 Thread Roberto Congiu
2015-08-21 3:17 GMT-07:00 smagadi sudhindramag...@fico.com:

 teenagers .toJSON gives the json but it does not preserve the parent ids

 meaning if the input was {name:Yin,
 address:{city:Columbus,state:Ohio},age:20}

 val x= sqlContext.sql(SELECT name, address.city, address.state ,age FROM
 people where age19 and age =30 ).toJSON

  x.collect().foreach(println)

 This returns back , missing address.
 {name:Yin,city:Columbus,state:Ohio,age:20}
 Is this a bug ?


You're not including it in the query, so of course it's not there.
Try


sqlContext.sql(Select * from ppl).toJSON.collect().foreach(println)

instead.
I get

{address:{city:Columbus,state:Ohio},name:Yin}

R.


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI All,
Any inputs for the actual problem statement

Regards,
Satish


On Fri, Aug 21, 2015 at 5:57 PM, Jeff Zhang zjf...@gmail.com wrote:

 Yong, Thanks for your reply.

 I tried spark-shell -i script-file, it works fine for me. Not sure the
 different with
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

 On Fri, Aug 21, 2015 at 7:01 PM, java8964 java8...@hotmail.com wrote:

 I believe spark-shell -i scriptFile is there. We also use it, at least
 in Spark 1.3.1.

 dse spark will just wrap spark-shell command, underline it is just
 invoking spark-shell.

 I don't know too much about the original problem though.

 Yong

 --
 Date: Fri, 21 Aug 2015 18:19:49 +0800
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: zjf...@gmail.com
 To: jsatishchan...@gmail.com
 CC: robin.e...@xense.co.uk; user@spark.apache.org


 Hi Satish,

 I don't see where spark support -i, so suspect it is provided by DSE.
 In that case, it might be bug of DSE.



 On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j 
 jsatishchan...@gmail.com wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








 --
 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Re: Failed stages and dropped executors when running implicit matrix factorization/ALS

2015-08-21 Thread Ravi Mody
I've been able to almost halve my memory usage with no instability issues.

I lowered my storage.memoryFraction and increased my shuffle.memoryFraction
(essentially swapping them). I set spark.yarn.executor.memoryOverhead to
6GB. And I lowered executor-cores in case other jobs are using the
available cores. I'm not sure which of these fixed the issue, but things
are much more stable now.

On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng men...@gmail.com wrote:

 Please see my comments inline. It would be helpful if you can attach
 the full stack trace. -Xiangrui

 On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody rmody...@gmail.com wrote:
  1. These are my settings:
  rank = 100
  iterations = 12
  users = ~20M
  items = ~2M
  training examples = ~500M-1B (I'm running into the issue even with 500M
  training examples)
 

 Did you set number of blocks? If you didn't, could you check how many
 partitions you have in the ratings RDD? Setting a large number of
 blocks would increase shuffle size. If you have enough RAM, try to set
 number of blocks to the number of CPU cores or less.

  2. The memory storage never seems to go too high. The user blocks may go
 up
  to ~10Gb, and each executor will have a few GB used out of 30 free GB.
  Everything seems small compared to the amount of memory I'm using.
 

 This looks correct.

  3. I think I have a lot of disk space - is this on the executors or the
  driver? Is there a way to know if the error is coming from disk space.
 

 You can see the shuffle data size for each iteration from the WebUI.
 Usually, it should throw an out of disk space exception instead of the
 message you posted. But it is worth checking.

  4. I'm not changing checkpointing settings, but I think checkpointing
  defaults to every 10 iterations? One notable thing is the crashes often
  start on or after the 9th iteration, so it may be related to
 checkpointing.
  But this could just be a coincidence.
 

 If you didn't set checkpointDir in SparkContext, the
 checkpointInterval setting in ALS has no effect.

  Thanks!
 
 
 
 
 
  On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat ayman.fara...@yahoo.com
  wrote:
 
  was there any resolution to that problem?
  I am also having that with Pyspark 1.4
  380 Million observations
  100 factors and 5 iterations
  Thanks
  Ayman
 
  On Jun 23, 2015, at 6:20 PM, Xiangrui Meng men...@gmail.com wrote:
 
   It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
   more information to guess what happened:
  
   1. Could you share the ALS settings, e.g., number of blocks, rank and
   number of iterations, as well as number of users/items in your
   dataset?
   2. If you monitor the progress in the WebUI, how much data is stored
   in memory and how much data is shuffled per iteration?
   3. Do you have enough disk space for the shuffle files?
   4. Did you set checkpointDir in SparkContext and checkpointInterval in
   ALS?
  
   Best,
   Xiangrui
  
   On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com
 wrote:
   Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on
   fairly
   large datasets (1+ billion input records). As I grow my dataset I
 often
   run
   into issues with a lot of failed stages and dropped executors,
   ultimately
   leading to the whole application failing. The errors are like
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
   output
   location for shuffle 19 and
   org.apache.spark.shuffle.FetchFailedException:
   Failed to connect to These occur during flatMap, mapPartitions,
   and
   aggregate stages. I know that increasing memory fixes this issue, but
   most
   of the time my executors are only using a tiny portion of the their
   allocated memory (10%). Often, the stages run fine until the last
   iteration
   or two of ALS, but this could just be a coincidence.
  
   I've tried tweaking a lot of settings, but it's time-consuming to do
   this
   through guess-and-check. Right now I have these set:
   spark.shuffle.memoryFraction = 0.3
   spark.storage.memoryFraction = 0.65
   spark.executor.heartbeatInterval = 60
  
   I'm sure these settings aren't optimal - any idea of what could be
   causing
   my errors, and what direction I can push these settings in to get
 more
   out
   of my memory? I'm currently using 240 GB of memory (on 7 executors)
 for
   a 1
   billion record dataset, which seems like too much. Thanks!
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Abhishek R. Singh
You had:

 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

Maybe try:

 rdd2 = RDD.reduceByKey((x,y) = x+y)
 rdd2.take(3)

-Abhishek-

On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:

 HI All,
 I have data in RDD as mentioned below:
 
 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key
 
 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)
 
 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73
 res:Array[(Int,Int)] = Array()
 
 Command as mentioned
 
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
 Please let me know what is missing in my code, as my resultant Array is empty
 
 
 
 Regards,
 Satish
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming 1.3 kafka error

2015-08-21 Thread Cody Koeninger
Sounds like that's happening consistently, not an occasional network
problem?

Look at the Kafka broker logs

Make sure you've configured the correct kafka broker hosts / ports (note
that direct stream does not use zookeeper host / port).

Make sure that host / port is reachable from your driver and worker nodes,
ie telnet or netcat to it.  It looks like your driver can reach it (since
there's partition info in the logs), but that doesn't mean the worker can.

Use lsof / netstat to see what's going on with those ports while the job is
running, or tcpdump if you need to.

If you can't figure out what's going on from a networking point of view,
post a minimal reproducible code sample that demonstrates the issue, so it
can be tested in a different environment.





On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka using 
 directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 
 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
   at kafka.utils.Utils$.read(Utils.scala:376)
   at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
   at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
   at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
   at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
   at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
   at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 
 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, 
 partition 75 offsets 4701 - 4718
 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




 Thanks




Want to install lz4 compression

2015-08-21 Thread Saif.A.Ellafi
Hi all,

I am using pre-compiled spark with hadoop 2.6. LZ4 Codec is not on hadoop's 
native libraries, so I am not being able to use it.

Can anyone suggest on how to proceed? Hopefully I wont have to recompile 
hadoop. I tried changing the --driver-library-path to point directly into lz4 
stand alone package libraries, but of course it didnt work.

Thanks
Saif



Re: Want to install lz4 compression

2015-08-21 Thread Ted Yu
Have you read this ?

http://stackoverflow.com/questions/22716346/how-to-use-lz4-compression-in-linux-3-11



 On Aug 21, 2015, at 6:57 AM, saif.a.ell...@wellsfargo.com 
 saif.a.ell...@wellsfargo.com wrote:
 
 Hi all,
  
 I am using pre-compiled spark with hadoop 2.6. LZ4 Codec is not on hadoop’s 
 native libraries, so I am not being able to use it.
  
 Can anyone suggest on how to proceed? Hopefully I wont have to recompile 
 hadoop. I tried changing the --driver-library-path to point directly into lz4 
 stand alone package libraries, but of course it didnt work.
  
 Thanks
 Saif
  


Tungsten and sun.misc.Unsafe

2015-08-21 Thread Marek Kolodziej
Hello,

I attended the Tungsten-related presentations at Spark Summit (by Josh
Rosen) and at Big Data Scala (by Matei Zaharia). Needless to say, this
project holds great promise for major performance improvements.

At Josh's talk, I heard about the use of sun.misc.Unsafe as a way of
achieving some of these optimizations (e.g. slides 11-17 of Josh's
presentation:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen).
I have no problems with the use of Unsafe in the code itself (I've done it
before myself, too), however I think there is a considerable risk
associated with beginning the use of Unsafe now, because Oracle is
determined to limit access to APIs such as Unsafe starting in Java 9.

JEP 260 http://openjdk.java.net/jeps/260 was filed specifically to limit
access to internal JDK APIs that were never intended for external use,
including sun.misc.* The JEP does say that the functionality of
sun.misc.Unsafe is to remain available even as other internal APIs are
blocked for non-JDK use, however, it also says that the functionality of
many methods of this class is now available via *variable handles (JEP 193
http://openjdk.java.net/jeps/193).* If the direct access to
sun.misc.Unsafe is blocked and only the variable handles access remains,
this may mean more than just a need for code refactoring - functionality
such as doing malloc from Spark core may be restricted.

JEP 260 has evolved quite a bit over time and the wording available now
(after the Aug. 4, 2015) seems more reasonable than before. Nevertheless,
Hazelcast and other companies whose technologies depend on the availability
of Unsafe started a Google doc here
https://docs.google.com/document/d/1GDm_cAxYInmoHMor-AkStzWvwE9pw6tnz_CebJQxuUE/edit#heading=h.brct71tr6e13
.

I doubt that Oracle would want to make life difficult for everyone. In
addition to Spark's code base, projects such as Akka, Cassandra, Hibernate,
Netty, Neo4j and Spring (among many others) depend on Unsafe. Still, there
are tons of posts about this issue in the Java community (e.g. here
https://jaxenter.com/hazelcast-on-java-unsafe-class-119286.html's a
Hazelcast interview, also from Aug. 3, the day before the latest update to
JEP 260). There are tons of concerned posts on the blogosphere, too (e.g.
here
http://blog.dripstat.com/removal-of-sun-misc-unsafe-a-disaster-in-the-making/
).

Have the leaders of the Spark community been following these Unsafe-related
developments and if so, what's Spark's plan of handling whatever Oracle
throws our way?

Marek


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Abhishek,

I have even tried that but rdd2 is empty

Regards,
Satish

On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 




RE: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread java8964
What version of Spark you are using, or comes with DSE 4.7?
We just cannot reproduce it in Spark.
yzhang@localhost$ more test.sparkval pairs = 
sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs.reduceByKey((x,y) = x + 
y).collectyzhang@localhost$ ~/spark/bin/spark-shell --master local -i 
test.sparkWelcome to    __ / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1  /_/
Using Scala version 2.10.4Spark context available as sc.SQL context available 
as sqlContext.Loading test.spark...pairs: org.apache.spark.rdd.RDD[(Int, Int)] 
= ParallelCollectionRDD[0] at makeRDD at console:2115/08/21 09:58:51 WARN 
SizeEstimator: Failed to check whether UseCompressedOops is set; assuming 
yesres0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))
Yong

Date: Fri, 21 Aug 2015 19:24:09 +0530
Subject: Re: Transformation not happening for reduceByKey or GroupByKey
From: jsatishchan...@gmail.com
To: abhis...@tetrationanalytics.com
CC: user@spark.apache.org

HI Abhishek,
I have even tried that but rdd2 is empty
Regards,Satish
On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:
You had:



 RDD.reduceByKey((x,y) = x+y)

 RDD.take(3)



Maybe try:



 rdd2 = RDD.reduceByKey((x,y) = x+y)

 rdd2.take(3)



-Abhishek-



On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:



 HI All,

 I have data in RDD as mentioned below:



 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))





 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key



 Code:

 RDD.reduceByKey((x,y) = x+y)

 RDD.take(3)



 Result in console:

 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73

 res:Array[(Int,Int)] = Array()



 Command as mentioned



 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile





 Please let me know what is missing in my code, as my resultant Array is empty







 Regards,

 Satish






  

Spark ec2 lunch problem

2015-08-21 Thread Garry Chen
Hi All,
I am trying to lunch a spark ec2 cluster by running  spark-ec2 
--key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 
--spark-version=1.4.1 launch spark-cluster but getting following message 
endless.  Please help.


Warning: SSH connection error. (This could be temporary.)
Host:
SSH return code: 255
SSH output: ssh: Could not resolve hostname : Name or service not known


Re: build spark 1.4.1 with JDK 1.6

2015-08-21 Thread Chen Song
Thanks Sean.

So how PySpark is supported. I thought PySpark needs jdk 1.6.

Chen

On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote:

 Spark 1.4 requires Java 7.

 On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote:

 I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
 PySpark, I used JDK 1.6.

 I got the following error,

 [INFO] --- scala-maven-plugin:3.2.0:testCompile
 (scala-test-compile-first) @ spark-streaming_2.10 ---

 java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable
 : Unsupported major.minor version 51.0
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

 I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
 Anyone has done this before?

 Thanks,

 --
 Chen Song




-- 
Chen Song


Re: build spark 1.4.1 with JDK 1.6

2015-08-21 Thread Marcelo Vanzin
That was only true until Spark 1.3. Spark 1.4 can be built with JDK7
and pyspark will still work.

On Fri, Aug 21, 2015 at 8:29 AM, Chen Song chen.song...@gmail.com wrote:
 Thanks Sean.

 So how PySpark is supported. I thought PySpark needs jdk 1.6.

 Chen

 On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote:

 Spark 1.4 requires Java 7.


 On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote:

 I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
 PySpark, I used JDK 1.6.

 I got the following error,

 [INFO] --- scala-maven-plugin:3.2.0:testCompile
 (scala-test-compile-first) @ spark-streaming_2.10 ---

 java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable
 : Unsupported major.minor version 51.0
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

 I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
 Anyone has done this before?

 Thanks,

 --
 Chen Song




 --
 Chen Song

 --

 ---
 You received this message because you are subscribed to the Google Groups
 CDH Users group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to cdh-user+unsubscr...@cloudera.org.
 For more options, visit https://groups.google.com/a/cloudera.org/d/optout.



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



build spark 1.4.1 with JDK 1.6

2015-08-21 Thread Chen Song
I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
PySpark, I used JDK 1.6.

I got the following error,

[INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first)
@ spark-streaming_2.10 ---

java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable :
Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
Anyone has done this before?

Thanks,

-- 
Chen Song


Finding the number of executors.

2015-08-21 Thread Virgil Palanciuc
Is there any reliable way to find out the number of executors
programatically - regardless of how the job  is run? A method that
preferably works for spark-standalone, yarn, mesos, regardless whether the
code runs from the shell or not?

Things that I tried and don't work:
- sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell,
does not work if task submitted via  spark-submit
- sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work
unless explicitly configured
- call to http://master:8080/json (this used to work, but doesn't anymore?)

I guess I could parse the output html from the Spark UI... but that seems
dumb. is there really no better way?

Thanks,
Virgil.


RE: Spark ec2 lunch problem

2015-08-21 Thread Garry Chen
No, the message never end.  I have to ctrl-c out of it.

Garry

From: shahid ashraf [mailto:sha...@trialx.com]
Sent: Friday, August 21, 2015 11:13 AM
To: Garry Chen g...@cornell.edu
Cc: user@spark.apache.org
Subject: Re: Spark ec2 lunch problem

Does the cluster work at the end ?

On Fri, Aug 21, 2015 at 8:25 PM, Garry Chen 
g...@cornell.edumailto:g...@cornell.edu wrote:
Hi All,
I am trying to lunch a spark ec2 cluster by running  spark-ec2 
--key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 
--spark-version=1.4.1 launch spark-cluster but getting following message 
endless.  Please help.


Warning: SSH connection error. (This could be temporary.)
Host:
SSH return code: 255
SSH output: ssh: Could not resolve hostname : Name or service not known



--
with Regards
Shahid Ashraf


Re: build spark 1.4.1 with JDK 1.6

2015-08-21 Thread Sean Owen
Spark 1.4 requires Java 7.

On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote:

 I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
 PySpark, I used JDK 1.6.

 I got the following error,

 [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first)
 @ spark-streaming_2.10 ---

 java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable
 : Unsupported major.minor version 51.0
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

 I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
 Anyone has done this before?

 Thanks,

 --
 Chen Song




Re: SparkSQL concerning materials

2015-08-21 Thread Sameer Farooqui
Have you seen the Spark SQL paper?:
https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf

On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz 
wysakowicz.da...@gmail.com wrote:

 Hi,

 thanks for answers. I have read answers you provided, but I rather look
 for some materials on the internals. E.g how the optimizer works, how the
 query is translated into rdd operations etc. The API I am quite familiar
 with.
 A good starting point for me was: Spark DataFrames: Simple and Fast
 Analysis of Structured Data
 https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term=

 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com:

 Or if you're a python lover then this is a good place -
 https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#



 On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote:

 See also
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

 Cheers

 On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif 
 muhammadatif...@gmail.com wrote:

 Hi Dawid

 The best pace to get started is the Spark SQL Guide from Apache
 http://spark.apache.org/docs/latest/sql-programming-guide.html

 Regards
 Muhammad

 On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the
 architecture, good practices, some internals. Could you advise me some
 materials on this matter?

 Regards
 Dawid








Re: Spark ec2 lunch problem

2015-08-21 Thread shahid ashraf
Does the cluster work at the end ?

On Fri, Aug 21, 2015 at 8:25 PM, Garry Chen g...@cornell.edu wrote:

 Hi All,

 I am trying to lunch a spark ec2 cluster by running
  spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc
 --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but
 getting following message endless.  Please help.





 Warning: SSH connection error. (This could be temporary.)

 Host:

 SSH return code: 255

 SSH output: ssh: Could not resolve hostname : Name or service not known




-- 
with Regards
Shahid Ashraf


Re: Data locality with HDFS not being seen

2015-08-21 Thread Sameer Farooqui
Hi Sunil,

Have you seen this fix in Spark 1.5 that may fix the locality issue?:
https://issues.apache.org/jira/browse/SPARK-4352

On Thu, Aug 20, 2015 at 4:09 AM, Sunil sdhe...@gmail.com wrote:

 Hello .  I am seeing some unexpected issues with achieving HDFS
 data
 locality. I expect the tasks to be executed only on the node which has the
 data but this is not happening (ofcourse, unless the node is busy in which
 case, I understand tasks can go to some other node). Could anyone clarify
 whats wrong with the way I am trying or what I should rather do? Below is
 the cluster configuration and experiments that I have tried. Any help will
 be appreciated. If you would like to recreate the below scenario, then you
 may use the JavaWordCount.java example given within the spark.

 *Cluster configuration:*

 1. spark-1.4.0 and hadoop-2.7.1
 2. Machines -- Master node (master) and 6 worker nodes (node1 to node6)
 3. master acts as -- spark master, HDFS name node  sec name node, Yarn
 resource manager
 4. Each of the 6 worker nodes act as -- spark worker node, HDFS data node,
 node manager

 *Data on HDFS:*

 20Mb text file is stored in single block. With the replication factor of 3,
 the text file is stored on nodes 2, 3  4.

 *Test-1 (Spark stand alone mode):*

 Application being run is the standard Java word count count example with
 the
 above text file in HDFS, as input. On job submission, I see in the spark
 web-UI that, stage-0(i.e mapToPair) is being run on random nodes (i.e.
 node1, node 2, node 6, etc.). By random I mean that, stage 0 executes on
 the
 very first worker node that gets registered to the application (this can be
 looked from the event timeline graph). Rather, I am expecting the stage-0
 to
 be run only on any one of the three nodes 2, 3, or 4.

 * Test-2 (Yarn cluster mode): *
 Same as above. No data locality seen.

 * Additional info: *
 No other spark applications are running and I have even tried by setting
 the
 /spark.locality.wait/ to 10s, but still no difference.

 Thanks and regards,
 Sunil



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-with-HDFS-not-being-seen-tp24361.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Convert mllib.linalg.Matrix to Breeze

2015-08-21 Thread Burak Yavuz
Hi Naveen,
As I mentioned before, the code is private therefore not accessible. Just
copy and use the snippet that I sent. Copying it here again:
https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270

Best,
Burak

On Thu, Aug 20, 2015 at 9:08 PM, Naveen nav...@formcept.com wrote:

 Hi,

 Thanks for the reply. I tried Matrix.toBreeze() which returns the
 following error:

 *method toBreeze in trait Matrix cannot be accessed in
 org.apache.spark.mllib.linalg.Matrix*



 On Thursday 20 August 2015 07:50 PM, Burak Yavuz wrote:

 Matrix.toBreeze is a private method. MLlib matrices have the same
 structure as Breeze Matrices. Just create a new Breeze matrix like this
 https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270.


 Best,
 Burak


 On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com wrote:

 You can use Matrix.toBreeze()
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56
  .

 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com:

 Hi All,

 Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze?
 Any leads are appreciated.


 Thanks,
 Naveen

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark streaming multi-tasking during I/O

2015-08-21 Thread Rishitesh Mishra
Hi Sateesh,
It is interesting to know , how did you determine that the Dstream runs on
a single core. Did you mean receivers?

Coming back to your question, could you not start disk io in a separate
thread, so that the sceduler can go ahead and assign other tasks ?
On 21 Aug 2015 16:06, Sateesh Kavuri sateesh.kav...@gmail.com wrote:

 Hi,

 My scenario goes like this:
 I have an algorithm running in Spark streaming mode on a 4 core virtual
 machine. Majority of the time, the algorithm does disk I/O and database
 I/O. Question is, during the I/O, where the CPU is not considerably loaded,
 is it possible to run any other task/thread so as to efficiently utilize
 the CPU?

 Note that one DStream of the algorithm runs completely on a single CPU

 Thank you,
 Sateesh



RE: SparkR csv without headers

2015-08-21 Thread Felix Cheung
You could also rename them with names
Unfortunately the API doesn't show the example of that 
https://spark.apache.org/docs/latest/api/R/index.html





On Thu, Aug 20, 2015 at 7:43 PM -0700, Sun, Rui rui@intel.com wrote:
Hi,

You can create a DataFrame using load.df() with a specified schema.

Something like:
schema - structType(structField(“a”, “string”), structField(“b”, integer), …)
read.df ( …, schema = schema)

From: Franc Carter [mailto:franc.car...@rozettatech.com]
Sent: Wednesday, August 19, 2015 1:48 PM
To: user@spark.apache.org
Subject: SparkR csv without headers


Hi,

Does anyone have an example of how to create a DataFrame in SparkR  which 
specifies the column names - the csv files I have do not have column names in 
the first row. I can get read a csv nicely with 
com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 
etc


thanks

--

Franc Carter I  Systems ArchitectI RoZetta Technology



[Description: Description: Description: cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

T  +61 2 8355 2515tel:%2B61%202%208355%202515 I
www.rozettatechnology.comhttp://www.rozettatechnology.com/

[cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be legally

privileged and confidential. Any unauthorised use of the contents is expressly 
prohibited.




Re: Spark streaming multi-tasking during I/O

2015-08-21 Thread Akhil Das
You can look at the spark.streaming.concurrentJobs by default it runs a
single job. If set it to 2 then it can run 2 jobs parallely. Its an
experimental flag, but go ahead and give it a try.
On Aug 21, 2015 3:36 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote:

 Hi,

 My scenario goes like this:
 I have an algorithm running in Spark streaming mode on a 4 core virtual
 machine. Majority of the time, the algorithm does disk I/O and database
 I/O. Question is, during the I/O, where the CPU is not considerably loaded,
 is it possible to run any other task/thread so as to efficiently utilize
 the CPU?

 Note that one DStream of the algorithm runs completely on a single CPU

 Thank you,
 Sateesh



Re: Set custm worker id ?

2015-08-21 Thread Akhil Das
You can try adding a humanly readable entry in your /etc/hosts file of the
worker machine and then you can set the SPARK_LOCAL_IP pointing to this
hostname on that machines spark-env.sh file.
On Aug 21, 2015 11:57 AM, saif.a.ell...@wellsfargo.com wrote:

 Hi,

 Is it possible in standalone to set up worker ID names? to avoid the
 worker-19248891237482379-ip..-port ??

 Thanks,
 Saif




Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.

2015-08-21 Thread Reynold Xin
You've probably hit this bug:
https://issues.apache.org/jira/browse/SPARK-7180

It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to
false and see if it goes away.


On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov evgeny.a.moro...@gmail.com
wrote:

 Hi,

 I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm
 trying to save my data frame to parquet.
 The issue I'm stuck looks like serialization tries to do pretty weird
 thing: tries to write to an empty array.

 The last (through stack trace) line of spark code that leads to exception
 is in method SerializationDebugger.visitSerializable(o: Object, stack:
 List[String]): List[String].
 desc.getObjFieldValues(finalObj, objFieldValues)

 The reason it does so, is because finalObj is
 org.apache.spark.sql.execution.Project and objFieldValues is an empty
 array! As a result there are two fields to read from the Project instance
 object (happens in java.io.ObjectStreamClass), but there is an empty array
 to read into.

 A little bit of code with debug info:
 private def visitSerializable(o: Object, stack: List[String]):
 List[String] = {
 val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project,
 desc: org.apache.spark.sql.execution.Project
   val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0:
 SparkPlan, 1: Project]
   var i = 0 //i: 0
   while (i  slotDescs.length) {
 val slotDesc = slotDescs(i) //slotDesc:
 org.apache.spark.sql.execution.SparkPlan
 if (slotDesc.hasWriteObjectMethod) {
   // TODO: Handle classes that specify writeObject method.
 } else {
   val fields: Array[ObjectStreamField] = slotDesc.getFields
 //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled]
   val objFieldValues: Array[Object] = new
 Array[Object](slotDesc.getNumObjFields) //objFieldValues:
 java.lang.Object[0]
   val numPrims = fields.length - objFieldValues.length //numPrims:
 1
   desc.getObjFieldValues(finalObj, objFieldValues) //leads to
 exception

 So it looks like it gets objFieldValues array from the SparkPlan object,
 but uses it to receive values from Project object.

 Here is the schema of my data frame
 root
  |-- Id: long (nullable = true)
  |-- explodes: struct (nullable = true)
  ||-- Identifiers: array (nullable = true)
  |||-- element: struct (containsNull = true)
  ||||-- Type: array (nullable = true)
  |||||-- element: string (containsNull = true)
  |-- Identifiers: struct (nullable = true)
  ||-- Type: array (nullable = true)
  |||-- element: string (containsNull = true)
  |-- Type2: string (nullable = true)
  |-- Type: string (nullable = true)

 Actual stack trace is:
 org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
 at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
 at
 com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
 at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
 at
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
 at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
 at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
 at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
 Caused by: 

Re: Spark ec2 lunch problem

2015-08-21 Thread Akhil Das
It may happen that the version of spark-ec2 script you are using is buggy
or sometime AWS have problem provisioning machines.
On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edu wrote:

 Hi All,

 I am trying to lunch a spark ec2 cluster by running
  spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc
 --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but
 getting following message endless.  Please help.





 Warning: SSH connection error. (This could be temporary.)

 Host:

 SSH return code: 255

 SSH output: ssh: Could not resolve hostname : Name or service not known



Re: spark streaming 1.3 kafka error

2015-08-21 Thread Akhil Das
That looks like you are choking your kafka machine. Do a top on the kafka
machines and see the workload, it may happen that you are spending too much
time on disk io etc.
On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports (note
 that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker nodes,
 ie telnet or netcat to it.  It looks like your driver can reach it (since
 there's partition info in the logs), but that doesn't mean the worker can.

 Use lsof / netstat to see what's going on with those ports while the job
 is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of view,
 post a minimal reproducible code sample that demonstrates the issue, so it
 can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka using 
 directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 
 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
  at kafka.utils.Utils$.read(Utils.scala:376)
  at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
  at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
  at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
  at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
  at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
  at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 
 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic 
 test_hbrealtimeevents, partition 75 offsets 4701 - 4718
 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




 Thanks





How frequently should full gc we expect

2015-08-21 Thread java8964
In the test job I am running in Spark 1.3.1 in our stage cluster, I can see 
following information on the application stage information:
MetricMin25th percentileMedian75th percentileMaxDuration0 ms1.1 min1.5 min1.7 
min3.4 minGC Time11 s16 s21 s25 s54 s
From the GC output log, I can see it is about full GC in the executor every 
minutes, like below.
My question is that the committed heap is more than 14G, and 
-XX:MaxPermSize=128m, in this case, the heap usage max is about 10G, why full 
GC happened every minute?
The job runs fine, just want to know what exception you guys normally have for 
full GC in the spark jobs?
Thanks
Yong
2015-08-21T16:53:59.561-0400: [Full GC [PSYoungGen: 328038K-0K(3728384K)] 
[ParOldGen: 10359817K-5856671K(11185152K)] 10687855K-5856671K(14913536K) 
[PSPermGen: 57214K-57214K(57856K)], 8.6951450 secs] [Times: user=140.72 
sys=0.18, real=8.69 secs] 
2015-08-21T16:54:09.605-0400: [GC [PSYoungGen: 1864192K-251539K(3728384K)] 
7720863K-6108211K(14913536K), 0.1217750 secs] [Times: user=2.12 sys=0.01, 
real=0.12 secs] 
2015-08-21T16:54:11.131-0400: [GC [PSYoungGen: 2115731K-163448K(3728384K)] 
7972404K-6197142K(14913536K), 0.1802910 secs] [Times: user=3.19 sys=0.01, 
real=0.18 secs] 
2015-08-21T16:54:12.832-0400: [GC [PSYoungGen: 2027640K-144369K(3728384K)] 
8061339K-6314232K(14913536K), 0.1816010 secs] [Times: user=3.03 sys=0.00, 
real=0.19 secs] 
2015-08-21T16:54:14.547-0400: [GC [PSYoungGen: 2008561K-121478K(3728384K)] 
8178424K-6435609K(14913536K), 0.1411160 secs] [Times: user=2.50 sys=0.00, 
real=0.14 secs] 
2015-08-21T16:54:15.931-0400: [GC [PSYoungGen: 1985670K-114489K(3728384K)] 
8299801K-6550508K(14913536K), 0.1285300 secs] [Times: user=2.13 sys=0.00, 
real=0.13 secs] 
2015-08-21T16:54:17.323-0400: [GC [PSYoungGen: 1978681K-219811K(3792896K)] 
8414700K-6769504K(14978048K), 0.1649230 secs] [Times: user=2.89 sys=0.01, 
real=0.17 secs] 
2015-08-21T16:54:18.878-0400: [GC [PSYoungGen: 2148515K-425173K(3728384K)] 
8698218K-6974876K(14913536K), 0.3130360 secs] [Times: user=5.56 sys=0.00, 
real=0.31 secs] 
2015-08-21T16:54:20.596-0400: [GC [PSYoungGen: 2353877K-313071K(3985408K)] 
8903582K-7071556K(15170560K), 0.2423240 secs] [Times: user=4.30 sys=0.00, 
real=0.24 secs] 
2015-08-21T16:54:22.695-0400: [GC [PSYoungGen: 2608367K-371370K(3902464K)] 
9366852K-7338548K(15087616K), 0.2647510 secs] [Times: user=4.48 sys=0.00, 
real=0.26 secs] 
2015-08-21T16:54:24.747-0400: [GC [PSYoungGen: 266K-459392K(4174336K)] 
9633844K-7528652K(15359488K), 0.3564370 secs] [Times: user=6.36 sys=0.00, 
real=0.35 secs] 
2015-08-21T16:54:26.951-0400: [GC [PSYoungGen: 3116160K-445880K(4075008K)] 
10185420K-7746897K(15260160K), 0.2853880 secs] [Times: user=5.07 sys=0.00, 
real=0.29 secs] 
2015-08-21T16:54:29.340-0400: [GC [PSYoungGen: 3102648K-286176K(4314112K)] 
10403665K-7809242K(15499264K), 0.2534940 secs] [Times: user=4.48 sys=0.01, 
real=0.25 secs] 
2015-08-21T16:54:31.979-0400: [GC [PSYoungGen: 3269600K-122064K(4261888K)] 
10792666K-7863493K(15447040K), 0.2035800 secs] [Times: user=3.41 sys=0.00, 
real=0.20 secs] 
2015-08-21T16:54:34.737-0400: [GC [PSYoungGen: 3105488K-555850K(4373504K)] 
10846917K-8297279K(15558656K), 0.2401510 secs] [Times: user=4.14 sys=0.00, 
real=0.24 secs] 
2015-08-21T16:54:38.015-0400: [GC [PSYoungGen: 3675978K-1146062K(4266496K)] 
11417409K-8887493K(15451648K), 0.4298600 secs] [Times: user=7.65 sys=0.00, 
real=0.43 secs] 
2015-08-21T16:54:41.492-0400: [GC [PSYoungGen: 4266190K-1326063K(3565056K)] 
12007627K-9231644K(14750208K), 0.5542100 secs] [Times: user=9.90 sys=0.01, 
real=0.55 secs] 
2015-08-21T16:54:43.797-0400: [GC [PSYoungGen: 3565039K-1587981K(3827200K)] 
11470620K-9612725K(15012352K), 0.5359080 secs] [Times: user=9.57 sys=0.00, 
real=0.54 secs] 
2015-08-21T16:54:45.856-0400: [GC [PSYoungGen: 3826957K-1047737K(3629568K)] 
11851701K-9914434K(14814720K), 0.7787060 secs] [Times: user=13.91 sys=0.00, 
real=0.78 secs] 
2015-08-21T16:54:48.174-0400: [GC [PSYoungGen: 2911929K-459808K(3728384K)] 
11778626K-10058483K(14913536K), 0.5953360 secs] [Times: user=10.62 sys=0.03, 
real=0.60 secs] 
2015-08-21T16:54:50.217-0400: [GC [PSYoungGen: 2324000K-102928K(3740160K)] 
11922675K-10159967K(14925312K), 0.3191560 secs] [Times: user=5.68 sys=0.01, 
real=0.32 secs] 
2015-08-21T16:54:51.951-0400: [GC [PSYoungGen: 1978896K-296227K(3728384K)] 
12035935K-10456136K(14913536K), 0.1809970 secs] [Times: user=3.02 sys=0.00, 
real=0.18 secs] 
2015-08-21T16:54:53.550-0400: [GC [PSYoungGen: 2172195K-316636K(3866624K)] 
12332104K-10720591K(15051776K), 0.2545970 secs] [Times: user=4.43 sys=0.00, 
real=0.25 secs] 
2015-08-21T16:54:55.340-0400: [GC [PSYoungGen: 2390748K-336907K(3800064K)] 
12794703K-11043658K(14985216K), 0.3550330 secs] [Times: user=6.28 sys=0.00, 
real=0.35 secs] 
2015-08-21T16:54:55.695-0400: [Full GC [PSYoungGen: 336907K-0K(3800064K)] 
[ParOldGen: 10706750K-5725402K(11185152K)] 11043658K-5725402K(14985216K) 
[PSPermGen: 57214K-57214K(57856K)], 9.5623960 secs] [Times: user=150.15 

Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Nathan Skone
Raghavendra,

Thanks for the quick reply! I don’t think I included enough information in my 
question. I am hoping to get fields that are not directly part of the 
aggregation. Imagine a dataframe representing website views with a userID, 
datetime, and a webpage address. How could I find the oldest or newest webpage 
address that an user visited? As I understand it you can only access fields 
that are part of the aggregation itself.

Thanks,
Impact


 On Aug 21, 2015, at 11:11 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:
 
 Impact,
 You can group by the data and then sort it by timestamp and take max to 
 select the oldest value.
 
 On Aug 21, 2015 11:15 PM, Impact nat...@skone.org 
 mailto:nat...@skone.org wrote:
 I am also looking for a way to achieve the reducebykey functionality on data
 frames. In my case I need to select one particular row (the oldest, based on
 a timestamp column value) by key.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 



Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Akhil Das
Did you try sorting it by datetime and doing a groupBy on the userID?
On Aug 21, 2015 12:47 PM, Nathan Skone nat...@skone.org wrote:

 Raghavendra,

 Thanks for the quick reply! I don’t think I included enough information in
 my question. I am hoping to get fields that are not directly part of the
 aggregation. Imagine a dataframe representing website views with a userID,
 datetime, and a webpage address. How could I find the oldest or newest
 webpage address that an user visited? As I understand it you can only
 access fields that are part of the aggregation itself.

 Thanks,
 Impact


 On Aug 21, 2015, at 11:11 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Impact,
 You can group by the data and then sort it by timestamp and take max to
 select the oldest value.
 On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote:

 I am also looking for a way to achieve the reducebykey functionality on
 data
 frames. In my case I need to select one particular row (the oldest, based
 on
 a timestamp column value) by key.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Finding the number of executors.

2015-08-21 Thread Virgil Palanciuc
Hi Akhil,

I'm using spark 1.4.1.
Number of executors is not in the command line, not in the
getExecutorMemoryStatus
(I already mentioned that I tried that, works in spark-shell but not when
executed via spark-submit). I tried looking at defaultParallelism too,
it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when
ran via spark-submit.

But the scheduler obviously knows this information. It *must* know it. How
can I access it? Other that parsing the HTML of the WebUI, that is...
that's pretty much guaranteed to work, and maybe I'll do that, but it's
extremely convoluted.

Regards,
Virgil.

On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Which version spark are you using? There was a discussion happened over
 here

 http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html


 http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E
 On Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote:

 Is there any reliable way to find out the number of executors
 programatically - regardless of how the job  is run? A method that
 preferably works for spark-standalone, yarn, mesos, regardless whether the
 code runs from the shell or not?

 Things that I tried and don't work:
 - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell,
 does not work if task submitted via  spark-submit
 - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't
 work unless explicitly configured
 - call to http://master:8080/json (this used to work, but doesn't
 anymore?)

 I guess I could parse the output html from the Spark UI... but that seems
 dumb. is there really no better way?

 Thanks,
 Virgil.





Re: Finding the number of executors.

2015-08-21 Thread Akhil Das
Which version spark are you using? There was a discussion happened over
here
http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html

http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E
On Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote:

 Is there any reliable way to find out the number of executors
 programatically - regardless of how the job  is run? A method that
 preferably works for spark-standalone, yarn, mesos, regardless whether the
 code runs from the shell or not?

 Things that I tried and don't work:
 - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell,
 does not work if task submitted via  spark-submit
 - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't
 work unless explicitly configured
 - call to http://master:8080/json (this used to work, but doesn't
 anymore?)

 I guess I could parse the output html from the Spark UI... but that seems
 dumb. is there really no better way?

 Thanks,
 Virgil.





Re: Worker Machine running out of disk for Long running Streaming process

2015-08-21 Thread Tathagata Das
Could you periodically (say every 10 mins) run System.gc() on the driver.
The cleaning up shuffles is tied to the garbage collection.


On Fri, Aug 21, 2015 at 2:59 AM, gaurav sharma sharmagaura...@gmail.com
wrote:

 Hi All,


 I have a 24x7 running Streaming Process, which runs on 2 hour windowed data

 The issue i am facing is my worker machines are running OUT OF DISK space

 I checked that the SHUFFLE FILES are not getting cleaned up.


 /log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data

 Ultimately the machines runs out of Disk Spac


 i read about *spark.cleaner.ttl *config param which what i can understand
 from the documentation, says cleans up all the metadata beyond the time
 limit.

 I went through https://issues.apache.org/jira/browse/SPARK-5836
 it says resolved, but there is no code commit

 Can anyone please throw some light on the issue.





Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-21 Thread Jerrick Hoang
@Cheng, Hao : Physical plans show that it got stuck on scanning S3!

(table is partitioned by date_prefix and hour)
explain select count(*) from test_table where date_prefix='20150819' and
hour='00';

TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)]
   Scan ParquetRelation[ .. about 1000 partition paths go here ]

Why does spark have to scan all partitions when the query only concerns
with 1 partitions? Doesn't it defeat the purpose of partitioning?

Thanks!

On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and
 I couldn't find much information about it online. What does it mean exactly
 to disable it? Are there any negative consequences to disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I've had the same problem. It turns out that Spark (specifically parquet)
 is very slow at partition discovery. It got better in 1.5 (not yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned parquet
 table with only one partition (date=20140701). A simple `select count(*)
 from table where date=20140701` would run very fast (0.1 seconds). However,
 as I added more partitions the query takes longer and longer. When I added
 about 10,000 partitions, the query took way too long. I feel like querying
 for a single partition should not be affected by having more partitions. Is
 this a known behaviour? What does spark try to do here?



 Thanks,

 Jerrick











Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-21 Thread Raghavendra Pandey
Did you try with hadoop version 2.7.1 .. It is known that s3a works really
well with parquet which is available in 2.7. They fixed lot of issues
related to metadata reading there...
On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 seconds).
 However, as I added more partitions the query takes longer and longer. When
 I added about 10,000 partitions, the query took way too long. I feel like
 querying for a single partition should not be affected by having more
 partitions. Is this a known behaviour? What does spark try to do here?



 Thanks,

 Jerrick












Re: Tungsten and sun.misc.Unsafe

2015-08-21 Thread Marek Kolodziej
Thanks Reynold, that helps a lot. I'm glad you're involved with that Google
Doc community effort. I think it's because of that doc that the JEP's
wording and scope changed for the better since it originally got
introduced.

Marek

On Fri, Aug 21, 2015 at 11:18 AM, Reynold Xin r...@databricks.com wrote:

 I'm actually somewhat involved with the Google Docs you linked to.

 I don't think Oracle will remove Unsafe in JVM 9. As you said, JEP 260
 already proposes making Unsafe available. Given the widespread use of
 Unsafe for performance and advanced functionalities, I don't think Oracle
 can just remove it in one release. If they do, there will be strong
 backlash and the act would significantly undermine the credibility of the
 JVM as a long-term platform.

 Note that for Spark itself, we move pretty fast and can replace all the
 use of Unsafe with a newer alternative in one release if absolutely
 necessary (the actual coding takes only a day or two).



 On Fri, Aug 21, 2015 at 5:29 AM, Marek Kolodziej mkolod@gmail.com
 wrote:

 Hello,

 I attended the Tungsten-related presentations at Spark Summit (by Josh
 Rosen) and at Big Data Scala (by Matei Zaharia). Needless to say, this
 project holds great promise for major performance improvements.

 At Josh's talk, I heard about the use of sun.misc.Unsafe as a way of
 achieving some of these optimizations (e.g. slides 11-17 of Josh's
 presentation:
 http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen).
 I have no problems with the use of Unsafe in the code itself (I've done it
 before myself, too), however I think there is a considerable risk
 associated with beginning the use of Unsafe now, because Oracle is
 determined to limit access to APIs such as Unsafe starting in Java 9.

 JEP 260 http://openjdk.java.net/jeps/260 was filed specifically to
 limit access to internal JDK APIs that were never intended for external
 use, including sun.misc.* The JEP does say that the functionality of
 sun.misc.Unsafe is to remain available even as other internal APIs are
 blocked for non-JDK use, however, it also says that the functionality of
 many methods of this class is now available via *variable handles (JEP
 193 http://openjdk.java.net/jeps/193).* If the direct access to
 sun.misc.Unsafe is blocked and only the variable handles access remains,
 this may mean more than just a need for code refactoring - functionality
 such as doing malloc from Spark core may be restricted.

 JEP 260 has evolved quite a bit over time and the wording available now
 (after the Aug. 4, 2015) seems more reasonable than before. Nevertheless,
 Hazelcast and other companies whose technologies depend on the availability
 of Unsafe started a Google doc here
 https://docs.google.com/document/d/1GDm_cAxYInmoHMor-AkStzWvwE9pw6tnz_CebJQxuUE/edit#heading=h.brct71tr6e13
 .

 I doubt that Oracle would want to make life difficult for everyone. In
 addition to Spark's code base, projects such as Akka, Cassandra, Hibernate,
 Netty, Neo4j and Spring (among many others) depend on Unsafe. Still, there
 are tons of posts about this issue in the Java community (e.g. here
 https://jaxenter.com/hazelcast-on-java-unsafe-class-119286.html's a
 Hazelcast interview, also from Aug. 3, the day before the latest update to
 JEP 260). There are tons of concerned posts on the blogosphere, too (e.g.
 here
 http://blog.dripstat.com/removal-of-sun-misc-unsafe-a-disaster-in-the-making/
 ).

 Have the leaders of the Spark community been following these
 Unsafe-related developments and if so, what's Spark's plan of handling
 whatever Oracle throws our way?

 Marek





Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Raghavendra Pandey
Impact,
You can group by the data and then sort it by timestamp and take max to
select the oldest value.
On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote:

 I am also looking for a way to achieve the reducebykey functionality on
 data
 frames. In my case I need to select one particular row (the oldest, based
 on
 a timestamp column value) by key.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: Remoting warning when submitting to cluster

2015-08-21 Thread javidelgadillo
I believe this was caused by some network configuration on my machines. After 
installing VirtualBox, some new network interfaces were installed on the 
machines and the Akka software was binding to one of the VirtualBox interfaces 
and not the interface that belonged to my Ethernet card.  Once I disabled the 
VirtualBox internet interfaces, things worked more reliably.

-Javier

From: sumit.anvekar [via Apache Spark User List] 
[mailto:ml-node+s1001560n24377...@n3.nabble.com]
Sent: Thursday, August 20, 2015 9:32 PM
To: Javier Delgadillo
Subject: Re: Remoting warning when submitting to cluster

Were you able to figure out what the issue is? I am also facing the same issue 
but with a EC2 (1 master, 2 worker) setup. also, I am trying to create RDD with 
data from remote Cassandra. My program jar has all the dependencies needed.

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733p24377.html
To unsubscribe from Remoting warning when submitting to cluster, click 
herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=22733code=amRlbGdhZGlsbG9AZXNyaS5jb218MjI3MzN8NzU3Mjg2MDAz.
NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733p24397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Set custm worker id ?

2015-08-21 Thread Saif.A.Ellafi
Hi,

Is it possible in standalone to set up worker ID names? to avoid the 
worker-19248891237482379-ip..-port ??

Thanks,
Saif



Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Impact
I am also looking for a way to achieve the reducebykey functionality on data
frames. In my case I need to select one particular row (the oldest, based on
a timestamp column value) by key.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Dan LaBar
Nathan,

I achieve this using rowNumber.  Here is a Python DataFrame example:

from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rowNumber

yourOutputDF = (
yourInputDF
.withColumn(first, rowNumber()
.over(Window.partitionBy(userID).orderBy(datetime))
   )
.withColumn(last, rowNumber()

.over(Window.partitionBy(userID).orderBy(desc(datetime)))
   )
)

You can get the first url like this:
yourOutputDF.filter(first=1).select(userID, url)

...and the last like this:
yourOutputDF.filter(last=1).select(userID, url)

If you wanted the first and last url as columns with one row per userID,
you could do a groupBy and take the max of a when column that returns the
url if last is 1, or null otherwise.  (You would need a similar column
where first is 1.)  Not sure if this makes sense, but I don't have time now
to provide a code example.

Regards,
Dan


On Fri, Aug 21, 2015 at 4:09 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try sorting it by datetime and doing a groupBy on the userID?
 On Aug 21, 2015 12:47 PM, Nathan Skone nat...@skone.org wrote:

 Raghavendra,

 Thanks for the quick reply! I don’t think I included enough information
 in my question. I am hoping to get fields that are not directly part of the
 aggregation. Imagine a dataframe representing website views with a userID,
 datetime, and a webpage address. How could I find the oldest or newest
 webpage address that an user visited? As I understand it you can only
 access fields that are part of the aggregation itself.

 Thanks,
 Impact


 On Aug 21, 2015, at 11:11 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Impact,
 You can group by the data and then sort it by timestamp and take max to
 select the oldest value.
 On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote:

 I am also looking for a way to achieve the reducebykey functionality on
 data
 frames. In my case I need to select one particular row (the oldest,
 based on
 a timestamp column value) by key.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-21 Thread Jerrick Hoang
Is there a workaround without updating Hadoop? Would really appreciate if
someone can explain what spark is trying to do here and what is an easy way
to turn this off. Thanks all!

On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works really
 well with parquet which is available in 2.7. They fixed lot of issues
 related to metadata reading there...
 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled
 to false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 seconds).
 However, as I added more partitions the query takes longer and longer. When
 I added about 10,000 partitions, the query took way too long. I feel like
 querying for a single partition should not be affected by having more
 partitions. Is this a known behaviour? What does spark try to do here?



 Thanks,

 Jerrick












Re: Finding the number of executors.

2015-08-21 Thread Du Li
Following is a method that retrieves the list of executors registered to a 
spark context. It worked perfectly with spark-submit in standalone mode for my 
project.
/**   * A simplified method that just returns the current active/registered 
executors   * excluding the driver.   * @param sc   *           The spark 
context to retrieve registered executors.   * @return   *         A list of 
executors each in the form of host:port.   */  def currentActiveExecutors(sc: 
SparkContext): Seq[String] = {    val allExecutors = 
sc.getExecutorMemoryStatus.map(_._1)    val driverHost: String = 
sc.getConf.get(spark.driver.host)    allExecutors.filter(! 
_.split(:)(0).equals(driverHost)).toList  }
 


 On Friday, August 21, 2015 1:53 PM, Virgil Palanciuc virg...@gmail.com 
wrote:
   

 Hi Akhil,
I'm using spark 1.4.1. Number of executors is not in the command line, not in 
the getExecutorMemoryStatus (I already mentioned that I tried that, works in 
spark-shell but not when executed via spark-submit). I tried looking at 
defaultParallelism too, it's 112 (7 executors * 16 cores) when ran via 
spark-shell, but just 2 when ran via spark-submit.
But the scheduler obviously knows this information. It *must* know it. How can 
I access it? Other that parsing the HTML of the WebUI, that is... that's pretty 
much guaranteed to work, and maybe I'll do that, but it's extremely convoluted.
Regards,Virgil.
On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

Which version spark are you using? There was a discussion happened over here 
http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.htmlhttp://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3EOn
 Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote:

Is there any reliable way to find out the number of executors programatically - 
regardless of how the job  is run? A method that preferably works for 
spark-standalone, yarn, mesos, regardless whether the code runs from the shell 
or not?
Things that I tried and don't work:- sparkContext.getExecutorMemoryStatus.size 
- 1 // works from the shell, does not work if task submitted via  spark-submit- 
sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work 
unless explicitly configured- call to http://master:8080/json (this used to 
work, but doesn't anymore?)
I guess I could parse the output html from the Spark UI... but that seems dumb. 
is there really no better way?
Thanks,Virgil.






  

Having Clause with variation and stddev

2015-08-21 Thread Ravisankar Mani
Hi,


Exception thrown when using Having Clause with variation or stddev. It
works perfectly when using other aggregate functions(Like
sum,count,min,max..)

SELECT SUM(1) AS `sum_number_of_records_ok` FROM
`some_db`.`some_table` `some_table`
GROUP BY 1 HAVING (STDDEV(1)  0)

SELECT SUM(1) AS `sum_number_of_records_ok` FROM
`some_db`.`some_table` `some_table`
GROUP BY 1 HAVING (VARIANCE(1)  0)

Could you please share any other way for using this kind query?

Regards,

Ravi


Re: spark kafka partitioning

2015-08-21 Thread Gaurav Agarwal
when i send the message from kafka topic having three partitions.

Spark will listen the message when i say kafkautils.createStream or
createDirectstSream have local[4]
Now i want to see if spark will create partitions when it receive
message from kafka using dstream, how and where ,prwhich method of
spark api i have to see to find out

On 8/21/15, Gaurav Agarwal gaurav130...@gmail.com wrote:
 Hello

 Regarding Spark Streaming and Kafka Partitioning

 When i send message on kafka topic with 3 partitions and listens on
 kafkareceiver with local value[4] . how will i come to know in Spark
 Streaming that different Dstreams are created according to partitions of
 kafka messages .

 Thanks


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL concerning materials

2015-08-21 Thread Dawid Wysakowicz
Hi,

thanks for answers. I have read answers you provided, but I rather look for
some materials on the internals. E.g how the optimizer works, how the query
is translated into rdd operations etc. The API I am quite familiar with.
A good starting point for me was: Spark DataFrames: Simple and Fast
Analysis of Structured Data
https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term=

2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com:

 Or if you're a python lover then this is a good place -
 https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#



 On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote:

 See also
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

 Cheers

 On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com
  wrote:

 Hi Dawid

 The best pace to get started is the Spark SQL Guide from Apache
 http://spark.apache.org/docs/latest/sql-programming-guide.html

 Regards
 Muhammad

 On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the architecture,
 good practices, some internals. Could you advise me some materials on this
 matter?

 Regards
 Dawid







ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file.
Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)
  val vVersionId = line(5)
  val vUin = line(6)
  val vClientIp = line(7)
  val vZoneId = line(8)
  val dtCreateTime = line(9)
  val iFeeFlag = Long.valueOf(line(10))
  val vLoginWay = line(11)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId,
vVersionId, vUin, vClientIp,
 vZoneId, dtCreateTime, vZoneId, dtCreateTime, iFeeFlag,
vLoginWay)


Re: what determine the task size?

2015-08-21 Thread Robineast
The OP wants to understand what determines the size of the task code that is 
shipped to each executor so it can run the task. I don't know the answer to but 
would be interested to know too.

Sent from my iPhone

 On 21 Aug 2015, at 08:26, oubrik [via Apache Spark User List] 
 ml-node+s1001560n24380...@n3.nabble.com wrote:
 
 Hi 
 You mean user code ? 
 
 
 
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24380.html
 To start a new topic under Apache Spark User List, email 
 ml-node+s1001560n1...@n3.nabble.com 
 To unsubscribe from Apache Spark User List, click here.
 NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PySpark concurrent jobs using single SparkContext

2015-08-21 Thread Hemant Bhanawat
It seems like you want simultaneous processing of multiple jobs but at the
same time serialization of few tasks within those jobs. I don't know how to
achieve that in Spark.

But, why would you bother about the inter-weaved processing when the data
that is being aggregated in different jobs is per customer per day? Is it
that save_aggregate depends on results of other customers and/or other
days?

I also don't understand how you would achieve that with yarn because
interweaving of tasks of separately submitted jobs may happen with dynamic
executor allocation as well.

Hemant


On Thu, Aug 20, 2015 at 7:04 PM, Mike Sukmanowsky 
mike.sukmanow...@gmail.com wrote:

 Hi all,

 We're using Spark 1.3.0 via a small YARN cluster to do some log
 processing. The jobs are pretty simple, for a number of customers and a
 number of days, fetch some event log data, build aggregates and store those
 aggregates into a data store.

 The way our script is written right now does something akin to:

 with SparkContext() as sc:
 for customer in customers:
 for day in days:
 logs = sc.textFile(get_logs(customer, day))
 aggregate = make_aggregate(logs)
 # This function contains the action saveAsNewAPIHadoopFile which
 # triggers a save
 save_aggregate(aggregate)

 ​
 So we have a Spark job per customer, per day.

 I tried doing some parallel job submission with something similar to:

 def make_and_save_aggregate(customer, day, spark_context):
 # Without a separate threading.Lock() here or better yet, one guarding the
 # Spark context, multiple customer/day transformations and actions could
 # be interweaved
 sc = spark_context
 logs = sc.textFile(get_logs(customer, day))
 aggregate = make_aggregate(logs)
 save_aggregate(aggregate)
 with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
 for customer in customers:
 for day in days:
 executor.submit(make_and_save_aggregate, customer, day, sc)

 ​
 The problem is, with no locks on a SparkContext except during
 initialization
 https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241
  and
 shutdown
 https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307,
 operations on the context could (if I understand correctly) be interweaved
 leading to DAG which contains transformations out of order and from
 different customer, day periods.

 One solution is instead to launch multiple Spark jobs via spark-submit and
 let YARN/Spark's dynamic executor allocation take care of fair scheduling.
 In practice, this doesn't seem to yield very fast computation perhaps due
 to some additional overhead with YARN.

 Is there any safe way to launch concurrent jobs like this using a single
 PySpark context?

 --
 Mike Sukmanowsky
 Aspiring Digital Carpenter

 *e*: mike.sukmanow...@gmail.com

 LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github
 https://github.com/msukmanowsky




Re:SPARK sql :Need JSON back isntead of roq

2015-08-21 Thread Todd
please try DataFrame.toJSON, it will give you an RDD of JSON string.








At 2015-08-21 15:59:43, smagadi sudhindramag...@fico.com wrote:
val teenagers = sqlContext.sql(SELECT name FROM people WHERE age = 13 AND
age = 19)

I need teenagers to be a JSON object rather a simple row .How can we get
that done ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-sql-Need-JSON-back-isntead-of-roq-tp24381.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
Yes, DSE 4.7

Regards,
Satish Chandra

On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish







Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Robin,
Yes, it is DSE but issue is related to Spark only

Regards,
Satish Chandra

On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish







Re: SPARK sql :Need JSON back isntead of roq

2015-08-21 Thread smagadi
teenagers .toJSON gives the json but it does not preserve the parent ids 

meaning if the input was {name:Yin,
address:{city:Columbus,state:Ohio},age:20}

val x= sqlContext.sql(SELECT name, address.city, address.state ,age FROM
people where age19 and age =30 ).toJSON

 x.collect().foreach(println)

This returns back , missing address.
{name:Yin,city:Columbus,state:Ohio,age:20}
Is this a bug ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-sql-Need-JSON-back-isntead-of-roq-tp24381p24386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Worker Machine running out of disk for Long running Streaming process

2015-08-21 Thread gaurav sharma
Hi All,


I have a 24x7 running Streaming Process, which runs on 2 hour windowed data

The issue i am facing is my worker machines are running OUT OF DISK space

I checked that the SHUFFLE FILES are not getting cleaned up.

/log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data

Ultimately the machines runs out of Disk Spac


i read about *spark.cleaner.ttl *config param which what i can understand
from the documentation, says cleans up all the metadata beyond the time
limit.

I went through https://issues.apache.org/jira/browse/SPARK-5836
it says resolved, but there is no code commit

Can anyone please throw some light on the issue.


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Jeff Zhang
Hi Satish,

I don't see where spark support -i, so suspect it is provided by DSE. In
that case, it might be bug of DSE.



On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








-- 
Best Regards

Jeff Zhang


spark streaming 1.3 kafka error

2015-08-21 Thread Shushant Arora
Hi


Getting below error in spark streaming 1.3 while consuming from kafka
using directkafka stream. Few of tasks are getting failed in each run.


What is the reason /solution of this error?


15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in
stage 130.0 (TID 16332)
java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 16348
15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage
130.0 (TID 16348)
15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic
test_hbrealtimeevents, partition 75 offsets 4701 - 4718
15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




Thanks


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Robin,
Yes, below mentioned piece or code works fine in Spark Shell but the same
when place in Script File and executed with -i file name it creating an
empty RDD

scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
makeRDD at console:28


scala pairs.reduceByKey((x,y) = x + y).collect
res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

Command:

dse spark --master local --jars postgresql-9.4-1201.jar -i
 ScriptFile

I understand, I am missing something here due to which my final RDD does
not have as required output

Regards,
Satish Chandra

On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
 Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
 console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish





Re: Creating Spark DataFrame from large pandas DataFrame

2015-08-21 Thread ayan guha
The easiest option I found to put jars in SPARK CLASSPATH
On 21 Aug 2015 06:20, Burak Yavuz brk...@gmail.com wrote:

 If you would like to try using spark-csv, please use
 `pyspark --packages com.databricks:spark-csv_2.11:1.2.0`

 You're missing a dependency.

 Best,
 Burak

 On Thu, Aug 20, 2015 at 1:08 PM, Charlie Hack charles.t.h...@gmail.com
 wrote:

 Hi,

 I'm new to spark and am trying to create a Spark df from a pandas df with
 ~5 million rows. Using Spark 1.4.1.

 When I type:

 df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None))

 (the df.where is a hack I found on the Spark JIRA to avoid a problem with
 NaN values making mixed column types)

 I get:

 TypeError: cannot create an RDD from type: type 'list'

 Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had
 this issue?


 This is already a workaround-- ideally I'd like to read the spark
 dataframe from a Hive table. But this is currently not an option for my
 setup.

 I also tried reading the data into spark from a CSV using spark-csv.
 Haven't been able to make this work as yet. I launch

 $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar

 and when I attempt to read the csv I get:

 Py4JJavaError: An error occurred while calling o22.load. :
 java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ...

 Other options I can think of:

 - Convert my CSV to json (use Pig?) and read into Spark
 - Read in using jdbc connect from postgres

 But want to make sure I'm not misusing Spark or missing something obvious.

 Thanks!

 Charlie





ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file, but I got
java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Long on the last step.

Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId)

2. Generate the schema:
 return StructType(Array(StructField(GameId, LongType, true),
StructField(AccountType, LongType, true), StructField(WorldId,
LongType, true), StructField(dtEventTime, StringType, true),
StructField(iEventId,
StringType, true)))

3. Apply the schema and apply it to the RDD of Rows:
val schemaRdd = sqlContext.createDataFrame(rowRdd, schema)

4. Save schemaRdd as a parquet file:
 schemaRdd.saveAsParquetFile(dst + / + tableName + .parquet)

However, it gave me a ClassCastException on step 4 (the DataFrame, i.e.
schemaRdd, can be correctly printed out according to the specified schema).

Thank you for your help!

Best,
Emma

Stack trace of the exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
1.0 (TID 12, 10-4-28-24): java.lang.ClassCastException: java.lang.String
cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:357)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:338)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:324)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


SPARK SQL support for XML

2015-08-21 Thread smagadi
Does spark sql supports XML the same way as it supports json ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-support-for-XML-tp24382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-Cassandra-connector

2015-08-21 Thread Ted Yu
Have you considered asking this question on
https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user
?

Cheers

On Thu, Aug 20, 2015 at 10:57 PM, Samya samya.ma...@amadeus.com wrote:

 Hi All,

 I need to write an RDD to Cassandra  using the sparkCassandraConnector
 from
 DataStax. My application is using Yarn.

 *Some basic Questions :*
 1.  Will a call to saveToCassandra(.), be using the same connection
 object between all task in a given executor? I mean is there 1 (one)
 connection object per executor, that is shared between tasks ?
 2. If the above answer is YES, is there a way to create a connectionPool
 for
 each executor, so that multiple task can dump data to cassandra in
 parallel?

 Regards,
 Samya



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-connector-tp24378.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org