Re: What is the point of alpha value in Collaborative Filtering in MLlib ?

2016-02-24 Thread Hiroyuki Yamada
Hi, I've been doing some POC for CF in MLlib.
In my environment,  ratings are all implicit so that I try to use it with
trainImplicit method (in python).

The trainImplicit method takes alpha as one of the arguments to specify a
confidence for the ratings as described in <
http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html>,
but the alpha value is global for all the ratings so I am not sure why we
need this.
(If it is per rating, it makes sense to me, though.)

What is the difference in setting different alpha values for exactly the
same data set ?

I would be very appreciated if someone give me a reasonable explanation for
this.

Best regards,
Hiro


Re: How could I do this algorithm in Spark?

2016-02-24 Thread James Barney
Guillermo,
I think you're after an associative algorithm where A is ultimately
associated with D, correct? Jakob would correct if that is a typo--a sort
would be all that is necessary in that case.

I believe you're looking for something else though, if I understand
correctly.

This seems like a similar algorithm to PageRank, no?
https://github.com/amplab/graphx/blob/master/python/examples/pagerank.py
Except return the "neighbor" itself, not the necessarily the rank of the
page.

If you wanted to, use Scala and Graphx for this problem. Might be a bit of
overhead though: Construct a node for each member of each tuple with an
edge between. Then traverse the graph for all sets of nodes that are
connected. That result set would quickly explode in size, but you could
restrict results to a minimum N connections. I'm not super familiar with
Graphx myself, however. My intuition is saying 'graph problem' though.

Thoughts?


On Wed, Feb 24, 2016 at 6:43 PM, Jakob Odersky  wrote:

> Hi Guillermo,
> assuming that the first "a,b" is a typo and you actually meant "a,d",
> this is a sorting problem.
>
> You could easily model your data as an RDD or tuples (or as a
> dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
> methods.
>
> best,
> --Jakob
>
> On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz 
> wrote:
> > I want to do some algorithm in Spark.. I know how to do it in a single
> > machine where all data are together, but I don't know a good way to do
> it in
> > Spark.
> >
> > If someone has an idea..
> > I have some data like this
> > a , b
> > x , y
> > b , c
> > y , y
> > c , d
> >
> > I want something like:
> > a , d
> > b , d
> > c , d
> > x , y
> > y , y
> >
> > I need to know that a->b->c->d, so a->d, b->d and c->d.
> > I don't want the code, just an idea how I could deal with it.
> >
> > Any idea?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: LDA topic Modeling spark + python

2016-02-24 Thread Mishra, Abhishek
Hello All,

If someone has any leads on this please help me.

Sincerely,
Abhishek

From: Mishra, Abhishek
Sent: Wednesday, February 24, 2016 5:11 PM
To: user@spark.apache.org
Subject: LDA topic Modeling spark + python


Hello All,





I am doing a LDA model, please guide me with something.



I have a csv file which has two column "user_id" and "status". I have to 
generate a word-topic distribution after aggregating the user_id. Meaning to 
say I need to model it for users on their grouped status. The topic length 
being 2000 and value of k or number of words being 3.



Please, if you can provide me with some link or some code base on spark with 
python ; I would be grateful.





Looking forward for a  reply,



Sincerely,

Abhishek



Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Cody Koeninger
The per partition offsets are part of the rdd as defined on the driver.
Have you read

https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

and/or watched

https://www.youtube.com/watch?v=fXnNEq1v3VA

On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen  wrote:

> Hi, as far as I know, there is a 1:1 mapping between Spark partition and
> Kafka partition, and in Spark's fault-tolerance mechanism, if a partition
> failed, another partition will be used to recompute those data. And my
> questions are below:
>
> When a partition (worker node) fails in Spark Streaming,
> 1. Is its computation passed to another partition, or just waits for the
> failed partition to restart?
> 2. How does the restarted partition know the offset range it should
> consume from Kafka? It should consume the some data as the before-failed
> one, right?
>


Re: Error:java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2016-02-24 Thread Yin Yang
See slides starting with slide #25 of
http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

FYI

On Wed, Feb 24, 2016 at 7:25 PM, xiazhuchang  wrote:

> When cache data to memory, the code DiskStore$getBytes will be called. If
> there is a big data, the code "channel.map(MapMode.READ_ONLY, offset,
> length)" will be called, and the "map" function's parameter "length" has a
> type of "long", but the valid range is "Integer".
>
> This results in the error: Size exceeds Integer.MAX_VALUE.
>
> should the valid range of length be changed to Long.MAX_VALUE?
>
>
> Error: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in
> stage 2.0 (TID 11877, 10.9.*.*): java.lang.RuntimeException:
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> at
>
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at
>
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
> at
>
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
>
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
>
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
>
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
>
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
>
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
>
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
>
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> at
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
>
> 

Error:java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2016-02-24 Thread xiazhuchang
When cache data to memory, the code DiskStore$getBytes will be called. If
there is a big data, the code "channel.map(MapMode.READ_ONLY, offset,
length)" will be called, and the "map" function's parameter "length" has a
type of "long", but the valid range is "Integer".

This results in the error: Size exceeds Integer.MAX_VALUE.

should the valid range of length be changed to Long.MAX_VALUE?


Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in
stage 2.0 (TID 11877, 10.9.*.*): java.lang.RuntimeException:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at

A question about Spark URL Usage: hostname vs IP address

2016-02-24 Thread Yu Song
Dear,

I meet a strange issue and I am not sure whether it is a Spark usage
limitation or a configuration issue.

I run Spark 1.5.1 in standalone mode. There is only one node in my cluster.
All services status are ok. While I visit the Spark web address, the Spark
URL is spark://c910f04x12.pok.test.com:7077 and I can successfully submit
job and start remote Spark shell with the URL and then I change the host
long name to the IP address: spark://10.4.12.1:7077 but failed. And then, I
check the master output, it caused by message dropped by akka

16/02/22 10:55:06 ERROR ErrorMonitor: dropping message [class
akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://
sparkMaster@10.4.12.1:7077/]] arriving at [akka.tcp://
sparkMaster@10.4.12.1:7077] inbound addresses are [akka.tcp://
sparkmas...@c910f04x12.pok.test.com:7077]

And then, I stop the service and set the SPARK_MASTER_IP to 10.4.12.1 in
spark-evn.sh and then start the service, I found the spark URL change to
spark://10.4.12.1:7077 and then I can successfully submit job with the URL
of IP format but if I change to host name, the submission will be failed
again. And the log similar just change the host name to IP.

So, is it my usage error or a usage limitation in Spark, the URL must be
exact same string but not replaced by IP or hostname?

BTW, I use the nslookup to test my hostname and IP, the result is right.
And I also try to add a long name and IP mapping in the /etc/hosts but
there is no any help.


How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Yuhang Chen
Hi, as far as I know, there is a 1:1 mapping between Spark partition and
Kafka partition, and in Spark's fault-tolerance mechanism, if a partition
failed, another partition will be used to recompute those data. And my
questions are below:

When a partition (worker node) fails in Spark Streaming,
1. Is its computation passed to another partition, or just waits for the
failed partition to restart?
2. How does the restarted partition know the offset range it should consume
from Kafka? It should consume the some data as the before-failed one, right?


Re: which master option to view current running job in Spark UI

2016-02-24 Thread Divya Gehlot
Hi Jeff ,

The issues with EC2 logs view .
Had to set up SSH tunnels to view the current running job.


Thanks,
Divya

On 24 February 2016 at 10:33, Jeff Zhang  wrote:

> View running job in SPARK UI doesn't matter which master you use.  What do
> you mean "I cant see the currently running jobs in Spark WEB UI" ? Do you
> see a blank spark ui or can't open the spark ui ?
>
> On Mon, Feb 15, 2016 at 12:55 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When running in YARN, you can use the YARN Resource Manager UI to get to
>> the ApplicationMaster url, irrespective of client or cluster mode.
>>
>> Regards
>> Sab
>> On 15-Feb-2016 10:10 am, "Divya Gehlot"  wrote:
>>
>>> Hi,
>>> I have Hortonworks 2.3.4 cluster on EC2 and Have spark jobs as scala
>>> files .
>>> I am bit confused between using *master  *options
>>> I want to execute this spark job in YARN
>>>
>>> Curently running as
>>> spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
>>> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
>>> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
>>> com.databricks:spark-csv_2.10:1.1.0  *--master yarn-client *  -i
>>> /TestDivya/Spark/Test.scala
>>>
>>> with this option I cant see the currently running jobs in Spark WEB UI
>>> though it later appear in spark history server.
>>>
>>> My question with which --master option should I run my spark jobs so
>>> that I can view the currently running jobs in spark web UI .
>>>
>>> Thanks,
>>> Divya
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


chang hadoop version when import spark

2016-02-24 Thread YouPeng Yang
Hi
  I am developing an application based on spark-1.6. my lib dependencies is
just as

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.0"
)

 it use hadoop 2.2.0 as the  default hadoop version which not my
preference.I want to change the hadoop versio when import spark .How to
achieve that.Do I need to recompile the spark source code with the hadoop
version I want to have?


Regards.
Best wishes.


Re: PySpark : couldn't pickle object of type class T

2016-02-24 Thread Jeff Zhang
Avro Record is not supported by pickler, you need to create a custom
pickler for it.  But I don't think it worth to do that. Actually you can
use package spark-avro to load avro data and then convert it to RDD if
necessary.

https://github.com/databricks/spark-avro


On Thu, Feb 11, 2016 at 10:38 PM, Anoop Shiralige  wrote:

> Hi All,
>
> I am working with Spark 1.6.0 and pySpark shell specifically. I have an
> JavaRDD[org.apache.avro.GenericRecord] which I have converted to pythonRDD
> in the following way.
>
> javaRDD = sc._jvm.java.package.loadJson("path to data", sc._jsc)
> javaPython = sc._jvm.SerDe.javaToPython(javaRDD)
> from pyspark.rdd import RDD
> pythonRDD=RDD(javaPython,sc)
>
> pythonRDD.first()
>
> However everytime I am trying to call collect() or first() method on
> pythonRDD I am getting the following error :
>
> 16/02/11 06:19:19 ERROR python.PythonRunner: Python worker exited
> unexpectedly (crashed)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File
>
> "/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
> command = pickleSer._read_with_length(infile)
>   File
>
> "/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 156, in _read_with_length
> length = read_int(stream)
>   File
>
> "/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 545, in read_int
> raise EOFError
> EOFError
>
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
> at
>
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
> at
> org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
> at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: net.razorvine.pickle.PickleException: couldn't pickle object of
> type class org.apache.avro.generic.GenericData$Record
> at net.razorvine.pickle.Pickler.save(Pickler.java:142)
> at
> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:493)
> at net.razorvine.pickle.Pickler.dispatch(Pickler.java:205)
> at net.razorvine.pickle.Pickler.save(Pickler.java:137)
> at net.razorvine.pickle.Pickler.dump(Pickler.java:107)
> at net.razorvine.pickle.Pickler.dumps(Pickler.java:92)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:110)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:110)
> at
>
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
> at
>
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
> at
>
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
>
> Thanks for your time,
> AnoopShiralige
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-couldn-t-pickle-object-of-type-class-T-tp26204.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Spark + Sentry + Kerberos don't add up?

2016-02-24 Thread Ruslan Dautkhanov
Turns to be it is a Spark issue

https://issues.apache.org/jira/browse/SPARK-13478




-- 
Ruslan Dautkhanov

On Mon, Jan 18, 2016 at 4:25 PM, Ruslan Dautkhanov 
wrote:

> Hi Romain,
>
> Thank you for your response.
>
> Adding Kerberos support might be as simple as
> https://issues.cloudera.org/browse/LIVY-44 ? I.e. add Livy --principal
> and --keytab parameters to be passed to spark-submit.
>
> As a workaround I just did kinit (using hues' keytab) and then launched
> Livy Server. It probably will work as long as kerberos ticket doesn't
> expire. That's it would be great to have support for --principal and
> --keytab parameters for spark-submit as explined in
> http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/cm_sg_yarn_long_jobs.html
>
>
> The only problem I have currently is the above error stack in my previous
> email:
>
> The Spark session could not be created in the cluster:
>> at org.apache.hadoop.security.*UserGroupInformation.doAs*(
>> UserGroupInformation.java:1671)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>> SparkSubmit.scala:160)
>
>
>
> >> AFAIK Hive impersonation should be turned off when using Sentry
>
> Yep, exactly. That's what I did. It is disabled now. But looks like on
> other hand, Spark or Spark Notebook want to have that enabled?
> It tries to do org.apache.hadoop.security.UserGroupInformation.doAs()
> hence the error.
>
> So Sentry isn't compatible with Spark in kerberized clusters? Is any
> workaround for this problem?
>
>
> --
> Ruslan Dautkhanov
>
> On Mon, Jan 18, 2016 at 3:52 PM, Romain Rigaux 
> wrote:
>
>> Livy does not support any Kerberos yet
>> https://issues.cloudera.org/browse/LIVY-3
>>
>> Are you focusing instead about HS2 + Kerberos with Sentry?
>>
>> AFAIK Hive impersonation should be turned off when using Sentry:
>> http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/sg_sentry_service_config.html
>>
>> On Sun, Jan 17, 2016 at 10:04 PM, Ruslan Dautkhanov > > wrote:
>>
>>> Getting following error stack
>>>
>>> The Spark session could not be created in the cluster:
 at org.apache.hadoop.security.*UserGroupInformation.doAs*
 (UserGroupInformation.java:1671)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:160)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) )
 at org.*apache.hadoop.hive.metastore.HiveMetaStoreClient*
 .open(HiveMetaStoreClient.java:466)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:234)
 at
 org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74)
 ... 35 more
>>>
>>>
>>> My understanding that hive.server2.enable.impersonation and
>>> hive.server2.enable.doAs should be enabled to make
>>> UserGroupInformation.doAs() work?
>>>
>>> When I try to enable these parameters, Cloudera Manager shows error
>>>
>>> Hive Impersonation is enabled for Hive Server2 role 'HiveServer2
 (hostname)'.
 Hive Impersonation should be disabled to enable Hive authorization
 using Sentry
>>>
>>>
>>> So Spark-Hive conflicts with Sentry!?
>>>
>>> Environment: Hue 3.9 Spark Notebooks + Livy Server (built from master).
>>> CDH 5.5.
>>>
>>> This is a kerberized cluster with Sentry.
>>>
>>> I was using hue's keytab as hue user is normally (by default in CDH) is
>>> allowed to impersonate to other users.
>>> So very convenient for Spark Notebooks.
>>>
>>> Any information to help solve this will be highly appreciated.
>>>
>>>
>>> --
>>> Ruslan Dautkhanov
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Hue-Users" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to hue-user+unsubscr...@cloudera.org.
>>>
>>
>>
>


Re: How could I do this algorithm in Spark?

2016-02-24 Thread Jakob Odersky
Hi Guillermo,
assuming that the first "a,b" is a typo and you actually meant "a,d",
this is a sorting problem.

You could easily model your data as an RDD or tuples (or as a
dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
methods.

best,
--Jakob

On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz  wrote:
> I want to do some algorithm in Spark.. I know how to do it in a single
> machine where all data are together, but I don't know a good way to do it in
> Spark.
>
> If someone has an idea..
> I have some data like this
> a , b
> x , y
> b , c
> y , y
> c , d
>
> I want something like:
> a , d
> b , d
> c , d
> x , y
> y , y
>
> I need to know that a->b->c->d, so a->d, b->d and c->d.
> I don't want the code, just an idea how I could deal with it.
>
> Any idea?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter on a column having multiple values

2016-02-24 Thread Yin Yang
However, when the number of choices gets big, the following notation
becomes cumbersome.


On Wed, Feb 24, 2016 at 3:41 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> You can use operators here.
>
> t.filter($"column1" === 1 || $"column1" === 2)
>
>
>
>
>
> On 24/02/2016 22:40, Ashok Kumar wrote:
>
> Hi,
>
> I would like to do the following
>
> select count(*) from  where column1 in (1,5))
>
> I define
>
> scala> var t = HiveContext.table("table")
>
> This works
> t.filter($"column1" ===1)
>
> How can I expand this to have column1  for both 1 and 5 please?
>
> thanks
>
>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>


Re: Filter on a column having multiple values

2016-02-24 Thread Mich Talebzadeh
 

You can use operators here. 

t.filter($"column1" === 1 || $"column1" === 2) 

On 24/02/2016 22:40, Ashok Kumar wrote: 

> Hi, 
> 
> I would like to do the following 
> 
> select count(*) from  where column1 in (1,5)) 
> 
> I define 
> 
> scala> var t = HiveContext.table("table") 
> 
> This works 
> t.filter($"column1" ===1) 
> 
> How can I expand this to have column1 for both 1 and 5 please? 
> 
> thanks

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: How to delete a record from parquet files using dataframes

2016-02-24 Thread Jakob Odersky
You can `filter` (scaladoc
)
your dataframes before saving them to- or after reading them from parquet
files

On Wed, Feb 24, 2016 at 1:28 AM, Cheng Lian  wrote:

> Parquet is a read-only format. So the only way to remove data from a
> written Parquet file is to write a new Parquet file without unwanted rows.
>
> Cheng
>
>
> On 2/17/16 5:11 AM, SRK wrote:
>
>> Hi,
>>
>> I am saving my records in the form of parquet files using dataframes in
>> hdfs. How to delete the records using dataframes?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-a-record-from-parquet-files-using-dataframes-tp26242.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Filter on a column having multiple values

2016-02-24 Thread Michael Armbrust
You can do this either with expr("... IN ...") or isin.

Here is a full example

.

On Wed, Feb 24, 2016 at 2:40 PM, Ashok Kumar 
wrote:

> Hi,
>
> I would like to do the following
>
> select count(*) from  where column1 in (1,5))
>
> I define
>
> scala> var t = HiveContext.table("table")
>
> This works
> t.filter($"column1" ===1)
>
> How can I expand this to have column1  for both 1 and 5 please?
>
> thanks
>


Re: How to Exploding a Map[String,Int] column in a DataFrame (Scala)

2016-02-24 Thread Michael Armbrust
You can do this using the explode function defined in
org.apache.spark.sql.functions.

Here is some example code

.


On Wed, Feb 24, 2016 at 3:06 PM, Anthony Brew  wrote:

> Hi,
>  I have a Dataframe containing a column with a map Map[A,B] with
> multiple values. I want to explode the key,value pairs in the map into a
> new column, actually planing to create 2 new cols.
>
> My plan had been
>
> - explode "input": Map[K,V] to "temp":Iterable[Map[K,V]]
> - new col temp to temp.key
> - new col temp to temp.value
> - drop temp
>
> But I am failing at the first hurdle.
>
> For example my data looks a bit like like
>
> scala> test.show()
> ++--+
> | id   |  brand
>   |
> ++--+
> |a02d1fa5d87dce6a7...|Map(Vans -> 1, Versace ->2,  ...|
>
>
> but I want to get to
>
> scala> test.show()
> +-+--+
> | id|  brand_key| brand_count   |
> +-+--+
> | a02d1fa5d87dce6a7...|   Vans   |   1   |
> | a02d1fa5d87dce6a7...|   Versace  |2  |
>
>
> Any suggestions would be appreciated.
>
> Thanks,
> Anthony
>
>
>


How to Exploding a Map[String,Int] column in a DataFrame (Scala)

2016-02-24 Thread Anthony Brew
Hi,
 I have a Dataframe containing a column with a map Map[A,B] with
multiple values. I want to explode the key,value pairs in the map into a
new column, actually planing to create 2 new cols.

My plan had been

- explode "input": Map[K,V] to "temp":Iterable[Map[K,V]]
- new col temp to temp.key
- new col temp to temp.value
- drop temp

But I am failing at the first hurdle.

For example my data looks a bit like like

scala> test.show()
++--+
| id   |  brand
|
++--+
|a02d1fa5d87dce6a7...|Map(Vans -> 1, Versace ->2,  ...|


but I want to get to

scala> test.show()
+-+--+
| id|  brand_key| brand_count   |
+-+--+
| a02d1fa5d87dce6a7...|   Vans   |   1   |
| a02d1fa5d87dce6a7...|   Versace  |2  |


Any suggestions would be appreciated.

Thanks,
Anthony


Re: Error reading a CSV

2016-02-24 Thread Imran Akbar
Thanks Suresh, that worked like a charm!
I created the /user/hive/warehouse directory and chmod'd to 777.

regards,
imran

On Wed, Feb 24, 2016 at 2:48 PM, Suresh Thalamati <
suresh.thalam...@gmail.com> wrote:

> Try creating  /user/hive/warehouse/  directory if it does not exists , and
> check it has
>  write permission for the user. Note the lower case ‘user’  in the path.
>
> > On Feb 24, 2016, at 2:42 PM, skunkwerk  wrote:
> >
> > I have downloaded the Spark binary with Hadoop 2.6.
> > When I run the spark-sql program like this with the CSV library:
> > ./bin/spark-sql --packages com.databricks:spark-csv_2.11:1.3.0
> >
> > I get into the console for spark-sql.
> > However, when I try to import a CSV file from my local filesystem:
> >
> > CREATE TABLE customerview USING com.databricks.spark.csv OPTIONS (path
> > "/Users/imran/Downloads/test.csv", header "true", inferSchema "true");
> >
> > I get the following error:
> > org.apache.hadoop.hive.ql.metadata.HiveException:
> > MetaException(message:file:/user/hive/warehouse/test is not a directory
> or
> > unable to create one)
> >
> > http://pastebin.com/BfyVv14U
> >
> > How can I fix this?
> >
> > thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-a-CSV-tp26329.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>


Re: Error reading a CSV

2016-02-24 Thread Suresh Thalamati
Try creating  /user/hive/warehouse/  directory if it does not exists , and 
check it has
 write permission for the user. Note the lower case ‘user’  in the path.  

> On Feb 24, 2016, at 2:42 PM, skunkwerk  wrote:
> 
> I have downloaded the Spark binary with Hadoop 2.6.
> When I run the spark-sql program like this with the CSV library:
> ./bin/spark-sql --packages com.databricks:spark-csv_2.11:1.3.0
> 
> I get into the console for spark-sql.
> However, when I try to import a CSV file from my local filesystem:
> 
> CREATE TABLE customerview USING com.databricks.spark.csv OPTIONS (path
> "/Users/imran/Downloads/test.csv", header "true", inferSchema "true");
> 
> I get the following error:
> org.apache.hadoop.hive.ql.metadata.HiveException:
> MetaException(message:file:/user/hive/warehouse/test is not a directory or
> unable to create one)
> 
> http://pastebin.com/BfyVv14U
> 
> How can I fix this?
> 
> thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-a-CSV-tp26329.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error reading a CSV

2016-02-24 Thread skunkwerk
I have downloaded the Spark binary with Hadoop 2.6.
When I run the spark-sql program like this with the CSV library:
./bin/spark-sql --packages com.databricks:spark-csv_2.11:1.3.0

I get into the console for spark-sql.
However, when I try to import a CSV file from my local filesystem:

CREATE TABLE customerview USING com.databricks.spark.csv OPTIONS (path
"/Users/imran/Downloads/test.csv", header "true", inferSchema "true");

I get the following error:
org.apache.hadoop.hive.ql.metadata.HiveException:
MetaException(message:file:/user/hive/warehouse/test is not a directory or
unable to create one)

http://pastebin.com/BfyVv14U

How can I fix this?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-a-CSV-tp26329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Filter on a column having multiple values

2016-02-24 Thread Ashok Kumar
 Hi,
I would like to do the following
select count(*) from  where column1 in (1,5))
I define
scala> var t = HiveContext.table("table")
This workst.filter($"column1" ===1)
How can I expand this to have column1  for both 1 and 5 please?
thanks


How could I do this algorithm in Spark?

2016-02-24 Thread Guillermo Ortiz
I want to do some algorithm in Spark.. I know how to do it in a single
machine where all data are together, but I don't know a good way to do it
in Spark.

If someone has an idea..
I have some data like this
a , b
x , y
b , c
y , y
c , d

I want something like:
a , d
b , d
c , d
x , y
y , y

I need to know that a->b->c->d, so a->d, b->d and c->d.
I don't want the code, just an idea how I could deal with it.

Any idea?


Spark Summit (San Francisco, June 6-8) call for presentation due in less than week

2016-02-24 Thread Reynold Xin
Just want to send a reminder in case people don't know about it. If you are
working on (or with, using) Spark, consider submitting your work to Spark
Summit, coming up in June in San Francisco.

https://spark-summit.org/2016/call-for-presentations/

Cheers.


Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
Looks like conflicting versions of the same dependency.
If you look at the mergeStrategy section of the build file I posted, you
can add additional lines for whatever dependencies are causing issues, e.g.

  case PathList("org", "jboss", "netty", _*) => MergeStrategy.first

On Wed, Feb 24, 2016 at 2:55 PM, Vinti Maheshwari 
wrote:

> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
> related content.
>
> It's giving lots of errors related to ivy:
>
> *[error]
> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
>
> Here is complete error log:
> https://gist.github.com/Vibhuti/07c24d2893fa6e520d4c
>
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger 
> wrote:
>
>> Ok, that build file I linked earlier has a minimal example of use.  just
>> running 'sbt assembly' given a similar build file should build a jar with
>> all the dependencies.
>>
>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari 
>> wrote:
>>
>>> I am not using sbt assembly currently. I need to check how to use sbt
>>> assembly.
>>>
>>> Regards,
>>> ~Vinti
>>>
>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger 
>>> wrote:
>>>
 Are you using sbt assembly?  That's what will include all of the
 non-provided dependencies in a single jar along with your code.  Otherwise
 you'd have to specify each separate jar in your spark-submit line, which is
 a pain.

 On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
 vinti.u...@gmail.com> wrote:

> Hi Cody,
>
> I tried with the build file you provided, but it's not working for me,
> getting same error:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
>
> I am not getting this error while building  (sbt package). I am
> getting this error when i am running my spark-streaming program.
> Do i need to specify kafka jar path manually with spark-submit --jars
> flag?
>
> My build.sbt:
>
> name := "NetworkStreaming"
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
> )
>
>
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger 
> wrote:
>
>> spark streaming is provided, kafka is not.
>>
>> This build file
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>
>> includes some hacks for ivy issues that may no longer be strictly
>> necessary, but try that build and see if it works for you.
>>
>>
>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>> vinti.u...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have tried multiple different settings in build.sbt but seems like
>>> nothing is working.
>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>
>>> Error
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>
>>> build.sbt
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>> "1.0.0"
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" %
>>> "provided"
>>> )
>>>
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>

>>>
>>
>


coalesce executor memory explosion

2016-02-24 Thread Christopher Brady
Short: Why does coalesce use huge amounts of memory? How does it work 
internally?


Long version:
I asked a similar question a few weeks ago, but I have a simpler test 
with better numbers now. I have an RDD created from some HDFS files. I 
want to sample it and then coalesce it into fewer partitions. For some 
reason coalesce uses huge amounts of memory. From what I've read, 
coalesce does not require full partitions to be in memory at once, so I 
don't understand what's causing this. Can anyone explain to me why 
coalesce needs so much memory? Are there any rules for determining the 
best number of partitions to coalesce into?


Spark version:
1.5.0

Test data:
241 GB of compress parquet files

Executors:
27 executors
16 GB memory each
3 cores each

In my tests I'm reading the data from HDFS, sampling it, coalescing into 
fewer partitions, and then doing a count just to have an action.


Without coalesce there is no memory issue. The size of the data makes no 
difference:
hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> 
count()

Per executor memory usage: 0.4 GB

Adding coalesce increases the memory usage substantially and it is still 
using more partitions than I'd like:
hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> 
coalesce (to 668 partitions) -> count()

Per executor memory usage: 3.1 GB

Going down to 201 partitions uses most of the available memory just for 
the coalesce:
hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> 
coalesce (to 201 partitions) -> count()

Per executor memory usage: 9.8 GB

Any number of partitions smaller than this will crash all the executors 
with out of memory. I don't really understand what is happening in 
Spark. That sample size should result in partitions smaller than the 
original partitions.


I've gone through the Spark documentation, youtube videos, and the 
Learning Spark book, but I haven't seen anything about this. Thanks.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Error msg is:

*[error] deduplicate: different file contents found in the following:*
[error]
/Users/vintim/.ivy2/cache/org.jruby/jruby-complete/jars/jruby-complete-1.6.5.jar:org/joda/time/tz/data/Europe/Bucharest
[error]
/Users/vintim/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.6.jar:org/joda/time/tz/data/Europe/Bucharest

I tried to adding below block, given in stackoverflow, but still no luck.

http://stackoverflow.com/questions/20393283/deduplication-error-with-sbt-assembly-plugin?rq=1

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
 cp filter {x => x.data.getName.matches("sbt.*") ||
x.data.getName.matches(".*macros.*")}}

Thanks,
~Vinti

On Wed, Feb 24, 2016 at 12:55 PM, Vinti Maheshwari 
wrote:

> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
> related content.
>
> It's giving lots of errors related to ivy:
>
> *[error]
> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
>
> Here is complete error log:
> https://gist.github.com/Vibhuti/07c24d2893fa6e520d4c
>
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger 
> wrote:
>
>> Ok, that build file I linked earlier has a minimal example of use.  just
>> running 'sbt assembly' given a similar build file should build a jar with
>> all the dependencies.
>>
>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari 
>> wrote:
>>
>>> I am not using sbt assembly currently. I need to check how to use sbt
>>> assembly.
>>>
>>> Regards,
>>> ~Vinti
>>>
>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger 
>>> wrote:
>>>
 Are you using sbt assembly?  That's what will include all of the
 non-provided dependencies in a single jar along with your code.  Otherwise
 you'd have to specify each separate jar in your spark-submit line, which is
 a pain.

 On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
 vinti.u...@gmail.com> wrote:

> Hi Cody,
>
> I tried with the build file you provided, but it's not working for me,
> getting same error:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
>
> I am not getting this error while building  (sbt package). I am
> getting this error when i am running my spark-streaming program.
> Do i need to specify kafka jar path manually with spark-submit --jars
> flag?
>
> My build.sbt:
>
> name := "NetworkStreaming"
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
> )
>
>
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger 
> wrote:
>
>> spark streaming is provided, kafka is not.
>>
>> This build file
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>
>> includes some hacks for ivy issues that may no longer be strictly
>> necessary, but try that build and see if it works for you.
>>
>>
>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>> vinti.u...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have tried multiple different settings in build.sbt but seems like
>>> nothing is working.
>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>
>>> Error
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>
>>> build.sbt
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>> "1.0.0"
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" %
>>> "provided"
>>> )
>>>
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>

>>>
>>
>


Re: Spark-avro issue in 1.5.2

2016-02-24 Thread Jonathan Kelly
This error is likely due to EMR including some Hadoop lib dirs in
spark.{driver,executor}.extraClassPath. (Hadoop bundles an older version of
Avro than what Spark uses, so you are probably getting bitten by this Avro
mismatch.)

We determined that these Hadoop dirs are not actually necessary to include
in the Spark classpath and in fact seem to be *causing* several problems
such as this one, so we have removed these directories from the
extraClassPath settings for the next EMR release.

For now, you may do the same yourself by using a configuration like the
following when creating your cluster:

[
  {
"classification":"spark-defaults",
"properties": {
  "spark.executor.extraClassPath":
"/etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*",
  "spark.driver.extraClassPath":
"/etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*"
}
  }
]

(For reference, the removed dirs are /usr/lib/hadoop/*,
/usr/lib/hadoop-hdfs/* and /usr/lib/hadoop-yarn/*.)

Hope this helps!
~ Jonathan

On Wed, Feb 24, 2016 at 1:14 PM  wrote:

> Hadoop 2.6.0 included?
> spark-assembly-1.5.2-hadoop2.6.0.jar
>
> On Feb 24, 2016, at 4:08 PM, Koert Kuipers  wrote:
>
> does your spark version come with batteries (hadoop included) or is it
> build with hadoop provided and you are adding hadoop binaries to classpath
>
> On Wed, Feb 24, 2016 at 3:08 PM,  wrote:
>
>> I’m trying to save a data frame in Avro format but am getting the
>> following error:
>>
>>
>> java.lang.NoSuchMethodError: 
>> org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
>>
>> I found the following workaround
>> https://github.com/databricks/spark-avro/issues/91
>> 
>>  -
>> which seems to say that this is from a mismatch in Avro versions. I have
>> tried following both solutions detailed to no avail:
>>  - Manually downloading avro-1.7.7.jar and including it in
>> /usr/lib/hadoop-mapreduce/
>>  - Adding avro-1.7.7.jar to spark.driver.extraClassPath and
>> spark.executor.extraClassPath
>>  - The same with avro-1.6.6
>>
>> I am still getting the same error, and now I am just stabbing in the
>> dark. Anyone else still running into this issue?
>>
>>
>> I am using Pyspark 1.5.2 on EMR.
>>
>
>
>


Re: Spark-avro issue in 1.5.2

2016-02-24 Thread Koert Kuipers
does your spark version come with batteries (hadoop included) or is it
build with hadoop provided and you are adding hadoop binaries to classpath

On Wed, Feb 24, 2016 at 3:08 PM,  wrote:

> I’m trying to save a data frame in Avro format but am getting the
> following error:
>
>
> java.lang.NoSuchMethodError: 
> org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
>
> I found the following workaround
> https://github.com/databricks/spark-avro/issues/91 - which seems to say
> that this is from a mismatch in Avro versions. I have tried following both
> solutions detailed to no avail:
>  - Manually downloading avro-1.7.7.jar and including it in
> /usr/lib/hadoop-mapreduce/
>  - Adding avro-1.7.7.jar to spark.driver.extraClassPath and
> spark.executor.extraClassPath
>  - The same with avro-1.6.6
>
> I am still getting the same error, and now I am just stabbing in the dark.
> Anyone else still running into this issue?
>
>
> I am using Pyspark 1.5.2 on EMR.
>


Re: Performing multiple aggregations over the same data

2016-02-24 Thread Nick Sabol
Yeah, sounds like you want to aggregate to a triple, like

data.aggregate((0, 0, 0))(
  (z, n) =>
// aggregate with zero value here,
  (a1, a2) =>
// combine previous aggregations here
)

On Tue, Feb 23, 2016 at 10:40 PM, Michał Zieliński <
zielinski.mich...@gmail.com> wrote:

> Do you mean something like this?
>
> data.agg(sum("var1"),sum("var2"),sum("var3"))
>
> On 24 February 2016 at 01:49, Daniel Imberman 
> wrote:
>
>> Hi guys,
>>
>> So I'm running into a speed issue where I have a dataset that needs to be
>> aggregated multiple times.
>>
>> Initially my team had set up three accumulators and were running a single
>> foreach loop over the data. Something along the lines of
>>
>> val accum1:Accumulable[a]
>> val accum2: Accumulable[b]
>> val accum3: Accumulable[c]
>>
>> data.foreach{
>> u =>
>> accum1+=u
>> accum2 += u
>> accum3 += u
>> }
>>
>> I am trying to switch these accumulations into an aggregation so that I
>> can get a speed boost and have access to accumulators for debugging. I am
>> currently trying to figure out a way to aggregate these three types at
>> once, since running 3 separate aggregations is significantly slower. Does
>> anyone have any thoughts as to how I can do this?
>>
>> Thank you
>>
>
>


Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
related content.

It's giving lots of errors related to ivy:

*[error]
/Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*

Here is complete error log:
https://gist.github.com/Vibhuti/07c24d2893fa6e520d4c


Regards,
~Vinti

On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger  wrote:

> Ok, that build file I linked earlier has a minimal example of use.  just
> running 'sbt assembly' given a similar build file should build a jar with
> all the dependencies.
>
> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari 
> wrote:
>
>> I am not using sbt assembly currently. I need to check how to use sbt
>> assembly.
>>
>> Regards,
>> ~Vinti
>>
>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger 
>> wrote:
>>
>>> Are you using sbt assembly?  That's what will include all of the
>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>> you'd have to specify each separate jar in your spark-submit line, which is
>>> a pain.
>>>
>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari >> > wrote:
>>>
 Hi Cody,

 I tried with the build file you provided, but it's not working for me,
 getting same error:
 Exception in thread "main" java.lang.NoClassDefFoundError:
 org/apache/spark/streaming/kafka/KafkaUtils$

 I am not getting this error while building  (sbt package). I am getting
 this error when i am running my spark-streaming program.
 Do i need to specify kafka jar path manually with spark-submit --jars
 flag?

 My build.sbt:

 name := "NetworkStreaming"
 libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"

 libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"

 libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"

 libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
 )



 Regards,
 ~Vinti

 On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger 
 wrote:

> spark streaming is provided, kafka is not.
>
> This build file
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>
> includes some hacks for ivy issues that may no longer be strictly
> necessary, but try that build and see if it works for you.
>
>
> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
> vinti.u...@gmail.com> wrote:
>
>> Hello,
>>
>> I have tried multiple different settings in build.sbt but seems like
>> nothing is working.
>> Can anyone suggest the right syntax/way to include kafka with spark?
>>
>> Error
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> build.sbt
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>> "1.0.0"
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>> )
>>
>>
>> Thanks,
>> Vinti
>>
>>
>

>>>
>>
>


Executor metrics

2016-02-24 Thread Sudo User
Hi,

I'm looking to get metrics from executors in Spark. What is the endpoint
for json data from the executors? For workers, I see that we can use
http://worker:8081/metrics/json but where do I find this info for executors?

I set executor.sink.servlet.path=/exec/path but there was no data available
on workers at this endpoint. I am able to get data from worker and master
as of now. Any help is appreciated

Thanks
Sudoer


Re: Using functional programming rather than SQL

2016-02-24 Thread Mich Talebzadeh
 

This is a point that I like to clarify please. 

These are my assumptions:. 

* Data resides in Hive tables in a Hive database
* Data has to be extracted from these tables. Tables are ORC so they
have ORC optimizations (Storage indexes, file, stride (64MB chunks of
data) , rowsets (in 10K rows) that contain min, max, sum for each column
at these three levels)
* HiveContext means use Hive internal optimization as well? Right
* Spark contacts Hive and instructs to get the data from Hive. Hive
doers all that
* Spark takes that data into memory space and does the queries

Does that make sense? 

On 24/02/2016 20:05, Koert Kuipers wrote: 

> my assumption, which is apparently incorrect, was that the SQL gets 
> translated into a catalyst plan that is executed in spark. the dataframe 
> operations (referred to by Mich as the FP results) also get translated into a 
> catalyst plan that is executed on the exact same spark platform. so unless 
> the SQL gets translated into a much better plan (perhaps thanks to some 
> pushdown into ORC?), i dont see why it can be much faster.
> 
> On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers  wrote:
> 
> i am still missing something. if it is executed in the source database, which 
> is hive in this case, then it does need hive, no? how can you execute in hive 
> without needing hive? 
> 
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan 
>  wrote:
> 
> I never said it needs one. All I said is that when calling context.sql() the 
> sql is executed in the source database (assuming datasource is Hive or some 
> RDBMS) 
> 
> Regards
> Sab 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
> 
> That is incorrect HiveContext does not need a hive instance to run. 
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" 
>  wrote:
> 
> Yes 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
> 
> are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
> 
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan 
>  wrote:
> 
> When using SQL your full query, including the joins, were executed in Hive(or 
> RDBMS) and only the results were brought into the Spark cluster. In the FP 
> case, the data for the 3 tables is first pulled into the Spark cluster and 
> then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" 
>  wrote:
> 
> Hi, 
> 
> First thanks everyone for their suggestions. Much appreciated. 
> 
> This was the original queries written in SQL and run against Spark-shell 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop") 
> 
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> The second queries were written in FP as much as I could as below 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query")
> val rs1 = 
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query")
> val rs2 
> 

Re: Using functional programming rather than SQL

2016-02-24 Thread Mich Talebzadeh
 

Hi Koert, 

My bad. I used a smaller size "sales" table in SQL plan. Kindly see my
new figures. 

On 24/02/2016 20:05, Koert Kuipers wrote: 

> my assumption, which is apparently incorrect, was that the SQL gets 
> translated into a catalyst plan that is executed in spark. the dataframe 
> operations (referred to by Mich as the FP results) also get translated into a 
> catalyst plan that is executed on the exact same spark platform. so unless 
> the SQL gets translated into a much better plan (perhaps thanks to some 
> pushdown into ORC?), i dont see why it can be much faster.
> 
> On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers  wrote:
> 
> i am still missing something. if it is executed in the source database, which 
> is hive in this case, then it does need hive, no? how can you execute in hive 
> without needing hive? 
> 
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan 
>  wrote:
> 
> I never said it needs one. All I said is that when calling context.sql() the 
> sql is executed in the source database (assuming datasource is Hive or some 
> RDBMS) 
> 
> Regards
> Sab 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
> 
> That is incorrect HiveContext does not need a hive instance to run. 
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" 
>  wrote:
> 
> Yes 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
> 
> are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
> 
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan 
>  wrote:
> 
> When using SQL your full query, including the joins, were executed in Hive(or 
> RDBMS) and only the results were brought into the Spark cluster. In the FP 
> case, the data for the 3 tables is first pulled into the Spark cluster and 
> then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" 
>  wrote:
> 
> Hi, 
> 
> First thanks everyone for their suggestions. Much appreciated. 
> 
> This was the original queries written in SQL and run against Spark-shell 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop") 
> 
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> The second queries were written in FP as much as I could as below 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query")
> val rs1 = 
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query")
> val rs2 
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> However The first query results are slightly different in SQL and FP (may be 
> the first query code in FP is not exactly correct?) and more importantly the 
> FP takes order of magnitude longer compared to SQL (8 minutes compared to 
> less than a minute). I am not surprised as I expected Functional Programming 
> has to flatten up all those method calls and convert them to 

Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
Ok, that build file I linked earlier has a minimal example of use.  just
running 'sbt assembly' given a similar build file should build a jar with
all the dependencies.

On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari 
wrote:

> I am not using sbt assembly currently. I need to check how to use sbt
> assembly.
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger 
> wrote:
>
>> Are you using sbt assembly?  That's what will include all of the
>> non-provided dependencies in a single jar along with your code.  Otherwise
>> you'd have to specify each separate jar in your spark-submit line, which is
>> a pain.
>>
>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari 
>> wrote:
>>
>>> Hi Cody,
>>>
>>> I tried with the build file you provided, but it's not working for me,
>>> getting same error:
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>
>>> I am not getting this error while building  (sbt package). I am getting
>>> this error when i am running my spark-streaming program.
>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>> flag?
>>>
>>> My build.sbt:
>>>
>>> name := "NetworkStreaming"
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>
>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>> )
>>>
>>>
>>>
>>> Regards,
>>> ~Vinti
>>>
>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger 
>>> wrote:
>>>
 spark streaming is provided, kafka is not.

 This build file

 https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt

 includes some hacks for ivy issues that may no longer be strictly
 necessary, but try that build and see if it works for you.


 On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
 vinti.u...@gmail.com> wrote:

> Hello,
>
> I have tried multiple different settings in build.sbt but seems like
> nothing is working.
> Can anyone suggest the right syntax/way to include kafka with spark?
>
> Error
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
>
> build.sbt
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
> "1.0.0"
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
> )
>
>
> Thanks,
> Vinti
>
>

>>>
>>
>


Re: Using Spark functional programming rather than SQL, Spark on Hive tables

2016-02-24 Thread Mich Talebzadeh
 

Well spotted Sab. You are correct. An oversight by me. They should both
use "sales". 

The results are now comparable 

The following statement 

"On the other hand using SQL the query 1 takes 19 seconds compared to
just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes." 

Should be amended to 

"Using SQL query 1 takes 3 min, 39 sec compared to 3 min, 44 sec using
FP 

Using SQL query 2 takes 3 min, 36 sec compared to 3 min, 53 sec using
FP" 

FP lags slightly behind SQL but not by any significant margin. 

Thanks 

On 24/02/2016 18:20, Sabarish Sasidharan wrote: 

> One more, you are referring to 2 different sales tables. That might account 
> for the difference in numbers. 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" 
>  wrote:
> 
> HI, 
> 
> TOOLS 
> 
> SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 
> 
> OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING 
> SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES 
> 
> UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 
> 
> The main differences in timings come from running the queries and fetching 
> data. If you look the transformation part that is 
> 
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>  
> 
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds 
> compared to just under 4 minutes for functional programming 
> 
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4 
> minutes. 
> 
> These are my assumptions. 
> 
> * Running SQL the full query is executed in Hive which means that Hive can 
> take advantage of ORC optimization/storage index etc?
> * Running FP requires that data is fetched from the underlying tables in Hive 
> and brought back to Spark cluster (standalone here) and the joins etc are 
> done there
> 
> The next step for me would be to: 
> 
> * Look at the query plans in Spark
> * Run the same code on Hive alone and compare results
> 
> Any other suggestions are welcome. 
> 
> STANDARD SQL CODE 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("ncreating data set at "); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string] 
> 
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, 
> channel_desc: string, TotalSales: decimal(20,0) 
> 
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at [24/02/2016 09:01:31.31 
> 
> CODE USING FUNCTIONAL PROGRAMMING 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = 
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> 

Spark-avro issue in 1.5.2

2016-02-24 Thread Ross.Cramblit
I’m trying to save a data frame in Avro format but am getting the following 
error:

  java.lang.NoSuchMethodError: 
org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;

I found the following workaround 
https://github.com/databricks/spark-avro/issues/91 - which seems to say that 
this is from a mismatch in Avro versions. I have tried following both solutions 
detailed to no avail:
 - Manually downloading avro-1.7.7.jar and including it in 
/usr/lib/hadoop-mapreduce/
 - Adding avro-1.7.7.jar to spark.driver.extraClassPath and 
spark.executor.extraClassPath
 - The same with avro-1.6.6

I am still getting the same error, and now I am just stabbing in the dark. 
Anyone else still running into this issue?


I am using Pyspark 1.5.2 on EMR.


Re: Left/Right Outer join on multiple Columns

2016-02-24 Thread Abhisheks
Oh that's easy ... just add this to the above statement for each duplicate
column - 
.drop(rightDF.col("x")).drop(rightDF.col("y")).

thanks!







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Left-Right-Outer-join-on-multiple-Columns-tp26293p26328.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using functional programming rather than SQL

2016-02-24 Thread Koert Kuipers
my assumption, which is apparently incorrect, was that the SQL gets
translated into a catalyst plan that is executed in spark. the dataframe
operations (referred to by Mich as the FP results) also get translated into
a catalyst plan that is executed on the exact same spark platform. so
unless the SQL gets translated into a much better plan (perhaps thanks to
some pushdown into ORC?), i dont see why it can be much faster.




On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers  wrote:

> i am still missing something. if it is executed in the source database,
> which is hive in this case, then it does need hive, no? how can you execute
> in hive without needing hive?
>
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> I never said it needs one. All I said is that when calling context.sql()
>> the sql is executed in the source database (assuming datasource is Hive or
>> some RDBMS)
>>
>> Regards
>> Sab
>>
>> Regards
>> Sab
>> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
>>
>>> That is incorrect HiveContext does not need a hive instance to run.
>>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 Yes

 Regards
 Sab
 On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on
> spark sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
>> In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against
>>> Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>> TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID
>>> FROM sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>> channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>> times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>

Re: Using functional programming rather than SQL

2016-02-24 Thread Koert Kuipers
i am still missing something. if it is executed in the source database,
which is hive in this case, then it does need hive, no? how can you execute
in hive without needing hive?

On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>>
 are you saying that HiveContext.sql(...) runs on hive, and not on
 spark sql?

 On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
> In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against
>> Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>> TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>> channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>> times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP
>> (may be the first query code in FP is not exactly correct?) and more
>> importantly the FP takes order of magnitude longer compared to SQL (8
>> minutes compared to less than a minute). I am not surprised as I expected
>> Functional Programming has to flatten up all those method calls and 
>> convert
>> them to SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: 

Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
I am not using sbt assembly currently. I need to check how to use sbt
assembly.

Regards,
~Vinti

On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger  wrote:

> Are you using sbt assembly?  That's what will include all of the
> non-provided dependencies in a single jar along with your code.  Otherwise
> you'd have to specify each separate jar in your spark-submit line, which is
> a pain.
>
> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari 
> wrote:
>
>> Hi Cody,
>>
>> I tried with the build file you provided, but it's not working for me,
>> getting same error:
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> I am not getting this error while building  (sbt package). I am getting
>> this error when i am running my spark-streaming program.
>> Do i need to specify kafka jar path manually with spark-submit --jars
>> flag?
>>
>> My build.sbt:
>>
>> name := "NetworkStreaming"
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>> )
>>
>>
>>
>> Regards,
>> ~Vinti
>>
>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger 
>> wrote:
>>
>>> spark streaming is provided, kafka is not.
>>>
>>> This build file
>>>
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>>
>>> includes some hacks for ivy issues that may no longer be strictly
>>> necessary, but try that build and see if it works for you.
>>>
>>>
>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari >> > wrote:
>>>
 Hello,

 I have tried multiple different settings in build.sbt but seems like
 nothing is working.
 Can anyone suggest the right syntax/way to include kafka with spark?

 Error
 Exception in thread "main" java.lang.NoClassDefFoundError:
 org/apache/spark/streaming/kafka/KafkaUtils$

 build.sbt
 libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
 libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
 libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
 libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
 )


 Thanks,
 Vinti


>>>
>>
>


RE: How to get progress information of an RDD operation

2016-02-24 Thread Wang, Ningjun (LNG-NPV)
Yes, I am looking for programmatic way of tracking progress.  
SparkListener.scala does not track at RDD item level so it will not tell how 
many items have been processed.

I wonder is there any way to track the accumulator value as it reflect the 
correct number of items processed so far?

Ningjun

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, February 23, 2016 2:30 PM
To: Kevin Mellott
Cc: Wang, Ningjun (LNG-NPV); user@spark.apache.org
Subject: Re: How to get progress information of an RDD operation

I think Ningjun was looking for programmatic way of tracking progress.

I took a look at:
./core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

but there doesn't seem to exist fine grained events directly reflecting what 
Ningjun looks for.

On Tue, Feb 23, 2016 at 11:24 AM, Kevin Mellott 
> wrote:
Have you considered using the Spark Web UI to view progress on your job? It 
does a very good job showing the progress of the overall job, as well as allows 
you to drill into the individual tasks and server activity.

On Tue, Feb 23, 2016 at 12:53 PM, Wang, Ningjun (LNG-NPV) 
> wrote:
How can I get progress information of a RDD operation? For example

val lines = sc.textFile("c:/temp/input.txt")  // a RDD of millions of line
lines.foreach(line => {
handleLine(line)
})
The input.txt contains millions of lines. The entire operation take 6 hours. I 
want to print out how many lines are processed every 1 minute so user know the 
progress. How can I do that?

One way I am thinking of is to use accumulator, e.g.



val lines = sc.textFile("c:/temp/input.txt")
val acCount = sc.accumulator(0L)
lines.foreach(line => {
handleLine(line)
acCount += 1
}

However how can I print out account every 1 minutes?


Ningjun





Re: About Tensor Factorization in Spark

2016-02-24 Thread Li Jiajia
Thanks Pooja! This is basically for TensorFlow. Seems don’t have many tensor 
features, basically matrix operations after the input part. 

Best regards!
Jiajia Li

--
E-mail: jiaji...@gatech.edu
Tel: +1 (404)9404603
Computational Science & Engineering
Georgia Institute of Technology

> On Feb 24, 2016, at 2:42 PM, Chadha Pooja  wrote:
> 
> Hi, 
>  
> Here is a link that might help - 
> https://databricks.com/blog/2016/01/25/deep-learning-with-spark-and-tensorflow.html
>  
> 
>  
> Thanks
> Pooja
>  
> From: Nick Pentreath [mailto:nick.pentre...@gmail.com 
> ] 
> Sent: Wednesday, February 24, 2016 1:11 AM
> To: user@spark.apache.org 
> Subject: Re: About Tensor Factorization in Spark
>  
> Not that I'm aware of - it would be a great addition as a Spark package!
> 
> On Wed, 24 Feb 2016 at 06:33 Li Jiajia  > wrote:
> Thanks Nick. I found this one. This library is focusing on a particular 
> application I guess, seems only implemented one tensor factorization 
> algorithm by far, and only for three dimensional tensors. Are there Spark 
> powered libraries supporting general tensors and the algorithms?
>  
> Best regards!
> Jiajia Li
> 
> --
> E-mail: jiaji...@gatech.edu 
> Tel: +1 (404)9404603
> Computational Science & Engineering
> Georgia Institute of Technology
>  
> On Feb 23, 2016, at 11:12 PM, Nick Pentreath  > wrote:
>  
> There is this library that I've come across - 
> https://github.com/FurongHuang/SpectralLDA-TensorSpark 
> 
>  
> On Wed, 24 Feb 2016 at 05:50, Li Jiajia  > wrote:
> Hi,
> I wonder if there are tensor algorithms or tensor data structures supported 
> by Spark MLlib or GraphX. In a Spark intro slide, tensor factorization is 
> mentioned as one of the algorithms in GraphX, but I didn't find it in the 
> guide. If not, do you plan to implement them in the future? 
> I’m new to Spark, please give some detailed explanation if possible. Thanks 
> in advance. 
>  
> Best regards!
> Jiajia Li
> 
> --
> E-mail: jiaji...@gatech.edu 
> Tel: +1 (404)9404603
> Computational Science & Engineering
> Georgia Institute of Technology
>  
>  
> 
> 
> The Boston Consulting Group, Inc. 
> 
> This e-mail message may contain confidential and/or privileged information. 
> If you are not an addressee or otherwise authorized to receive this message, 
> you should not use, copy, disclose or take any action based on this e-mail or 
> any information contained in the message. If you have received this material 
> in error, please advise the sender immediately by reply e-mail and delete 
> this message. Thank you.



RE: About Tensor Factorization in Spark

2016-02-24 Thread Chadha Pooja
Hi,

Here is a link that might help - 
https://databricks.com/blog/2016/01/25/deep-learning-with-spark-and-tensorflow.html

Thanks
Pooja

From: Nick Pentreath [mailto:nick.pentre...@gmail.com]
Sent: Wednesday, February 24, 2016 1:11 AM
To: user@spark.apache.org
Subject: Re: About Tensor Factorization in Spark

Not that I'm aware of - it would be a great addition as a Spark package!
On Wed, 24 Feb 2016 at 06:33 Li Jiajia 
> wrote:
Thanks Nick. I found this one. This library is focusing on a particular 
application I guess, seems only implemented one tensor factorization algorithm 
by far, and only for three dimensional tensors. Are there Spark powered 
libraries supporting general tensors and the algorithms?

Best regards!
Jiajia Li

--
E-mail: jiaji...@gatech.edu
Tel: +1 (404)9404603
Computational Science & Engineering
Georgia Institute of Technology

On Feb 23, 2016, at 11:12 PM, Nick Pentreath 
> wrote:

There is this library that I've come across - 
https://github.com/FurongHuang/SpectralLDA-TensorSpark

On Wed, 24 Feb 2016 at 05:50, Li Jiajia 
> wrote:
Hi,
I wonder if there are tensor algorithms or tensor data structures supported by 
Spark MLlib or GraphX. In a Spark intro slide, tensor factorization is 
mentioned as one of the algorithms in GraphX, but I didn't find it in the 
guide. If not, do you plan to implement them in the future?
I’m new to Spark, please give some detailed explanation if possible. Thanks in 
advance.

Best regards!
Jiajia Li

--
E-mail: jiaji...@gatech.edu
Tel: +1 (404)9404603
Computational Science & Engineering
Georgia Institute of Technology

__
The Boston Consulting Group, Inc.
 
This e-mail message may contain confidential and/or privileged information.
If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.


Re: newbie unable to write to S3 403 forbidden error

2016-02-24 Thread Andy Davidson
Hi Sabarish

We finally got S3 working. I think the real problem was that by default
spark-ec2 uses an old version of hadoop (1.0.4). The we passed
--copy-aws-credentials --hadoop-major-version=2  it started working

Kind regards

Andy


From:  Sabarish Sasidharan 
Date:  Sunday, February 14, 2016 at 7:05 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: newbie unable to write to S3 403 forbidden error

> 
> Make sure you are using s3 bucket in same region. Also I would access my
> bucket this way s3n://bucketname/foldername.
> 
> You can test privileges using the s3 cmd line client.
> 
> Also, if you are using instance profiles you don't need to specify access and
> secret keys. No harm in specifying though.
> 
> Regards
> Sab
> On 12-Feb-2016 2:46 am, "Andy Davidson"  wrote:
>> I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I am
>> using the standalone cluster manager
>> 
>> My java streaming app is not able to write to s3. It appears to be some for
>> of permission problem.
>> 
>> Any idea what the problem might be?
>> 
>> I tried use the IAM simulator to test the policy. Everything seems okay. Any
>> idea how I can debug this problem?
>> 
>> Thanks in advance
>> 
>> Andy
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> 
>> // I did not include the full key in my email
>>// the keys do not contain Œ\¹
>>// these are the keys used to create the cluster. They belong to the
>> IAM user andy
>> jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX");
>> 
>> jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
>> "uBh9v1hdUctI23uvq9qR");
>> 
>> 
>> 
>> 
>>   private static void saveTweets(JavaDStream jsonTweets, String
>> outputURI) {
>> 
>> jsonTweets.foreachRDD(new VoidFunction2() {
>> 
>> private static final long serialVersionUID = 1L;
>> 
>> 
>> 
>> @Override
>> 
>> public void call(JavaRDD rdd, Time time) throws Exception
>> {
>> 
>> if(!rdd.isEmpty()) {
>> 
>> // bucket name is Œcom.pws.twitter¹ it has a folder Œjson'
>> 
>> String dirPath =
>> "s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/
>>  json² + "-" +
>> time.milliseconds();
>> 
>> rdd.saveAsTextFile(dirPath);
>> 
>> }
>> 
>> }
>> 
>> });
>> 
>> 
>> 
>> 
>> Bucket name : com.pws.titter
>> Bucket policy (I replaced the account id)
>> 
>> {
>> "Version": "2012-10-17",
>> "Id": "Policy1455148808376",
>> "Statement": [
>> {
>> "Sid": "Stmt1455148797805",
>> "Effect": "Allow",
>> "Principal": {
>> "AWS": "arn:aws:iam::123456789012:user/andy"
>> },
>> "Action": "s3:*",
>> "Resource": "arn:aws:s3:::com.pws.twitter/*"
>> }
>> ]
>> }
>> 
>> 




Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
Are you using sbt assembly?  That's what will include all of the
non-provided dependencies in a single jar along with your code.  Otherwise
you'd have to specify each separate jar in your spark-submit line, which is
a pain.

On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari 
wrote:

> Hi Cody,
>
> I tried with the build file you provided, but it's not working for me,
> getting same error:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
>
> I am not getting this error while building  (sbt package). I am getting
> this error when i am running my spark-streaming program.
> Do i need to specify kafka jar path manually with spark-submit --jars flag?
>
> My build.sbt:
>
> name := "NetworkStreaming"
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
> )
>
>
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger 
> wrote:
>
>> spark streaming is provided, kafka is not.
>>
>> This build file
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>
>> includes some hacks for ivy issues that may no longer be strictly
>> necessary, but try that build and see if it works for you.
>>
>>
>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari 
>> wrote:
>>
>>> Hello,
>>>
>>> I have tried multiple different settings in build.sbt but seems like
>>> nothing is working.
>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>
>>> Error
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>
>>> build.sbt
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>> )
>>>
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>


Re: rdd.collect.foreach() vs rdd.collect.map()

2016-02-24 Thread Chitturi Padma
If you want to do processing in parallel, never use collect or any action
such as count or first, they compute the result and bring it back to
driver. rdd.map does processing in parallel. Once you have processed rdd
then save it to DB.

 rdd.foreach executes on the workers, Infact, it returns unit.



On Wed, Feb 24, 2016 at 11:56 PM, Anurag [via Apache Spark User List] <
ml-node+s1001560n26325...@n3.nabble.com> wrote:

> @Chitturi-Thanks a lot for replying
>
> 2 followup questions :
>
> 1. what if I am not collecting Rdd, then will Rdd.foreach() and Rdd.map()
> do processing in parallel ?
>
>
> 2. Let's say I have to get the results first and then do something before
> saving them into database. But I want to do that in parallel? How should I
> do it ? I am using Rdd.collect().foreach(), but it is not doing
> processing in parallel.
>
> Regards
> Anurag
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/rdd-collect-foreach-vs-rdd-collect-map-tp26320p26325.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-collect-foreach-vs-rdd-collect-map-tp26320p26326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Hi Cody,

I tried with the build file you provided, but it's not working for me,
getting same error:
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$

I am not getting this error while building  (sbt package). I am getting
this error when i am running my spark-streaming program.
Do i need to specify kafka jar path manually with spark-submit --jars flag?

My build.sbt:

name := "NetworkStreaming"
libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
)



Regards,
~Vinti

On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger  wrote:

> spark streaming is provided, kafka is not.
>
> This build file
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>
> includes some hacks for ivy issues that may no longer be strictly
> necessary, but try that build and see if it works for you.
>
>
> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari 
> wrote:
>
>> Hello,
>>
>> I have tried multiple different settings in build.sbt but seems like
>> nothing is working.
>> Can anyone suggest the right syntax/way to include kafka with spark?
>>
>> Error
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> build.sbt
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>> )
>>
>>
>> Thanks,
>> Vinti
>>
>>
>


Re: Using functional programming rather than SQL

2016-02-24 Thread Mohannad Ali
My apologies I definitely misunderstood. You are 100% correct.
On Feb 24, 2016 19:25, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>>
 are you saying that HiveContext.sql(...) runs on hive, and not on
 spark sql?

 On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
> In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against
>> Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>> TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>> channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>> times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP
>> (may be the first query code in FP is not exactly correct?) and more
>> importantly the FP takes order of magnitude longer compared to SQL (8
>> minutes compared to less than a minute). I am not surprised as I expected
>> Functional Programming has to flatten up all those method calls and 
>> convert
>> them to SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first 

Re: Execution plan in spark

2016-02-24 Thread Sabarish Sasidharan
There is no execution plan for FP. Execution plan exists for sql.

Regards
Sab
On 24-Feb-2016 2:46 pm, "Ashok Kumar"  wrote:

> Gurus,
>
> Is there anything like explain in Spark to see the execution plan in
> functional programming?
>
> warm regards
>


Re: Using functional programming rather than SQL

2016-02-24 Thread Sabarish Sasidharan
I never said it needs one. All I said is that when calling context.sql()
the sql is executed in the source database (assuming datasource is Hive or
some RDBMS)

Regards
Sab

Regards
Sab
On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:

> That is incorrect HiveContext does not need a hive instance to run.
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Yes
>>
>> Regards
>> Sab
>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>
>>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>>> sql?
>>>
>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 When using SQL your full query, including the joins, were executed in
 Hive(or RDBMS) and only the results were brought into the Spark cluster. In
 the FP case, the data for the 3 tables is first pulled into the Spark
 cluster and then the join is executed.

 Thus the time difference.

 It's not immediately obvious why the results are different.

 Regards
 Sab
 On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
 mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against
> Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
> TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
> channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
> times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP
> (may be the first query code in FP is not exactly correct?) and more
> importantly the FP takes order of magnitude longer compared to SQL (8
> minutes compared to less than a minute). I am not surprised as I expected
> Functional Programming has to flatten up all those method calls and 
> convert
> them to SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

2016-02-24 Thread Sabarish Sasidharan
Spark has its own efficient in memory columnar format. So it's not ORC.
It's just that the data has to be serialized and deserialized over the
network. And that is consuming time.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>1. Running SQL the full query is executed in Hive which means that
>Hive can take advantage of ORC optimization/storage index etc?
>2. Running FP requires that data is fetched from the underlying tables
>in Hive and brought back to Spark cluster (standalone here) and the joins
>etc are done there
>
> The next step for me would be to:
>
>1. Look at the query plans in Spark
>2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECTt.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

2016-02-24 Thread Sabarish Sasidharan
One more, you are referring to 2 different sales tables. That might account
for the difference in numbers.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>1. Running SQL the full query is executed in Hive which means that
>Hive can take advantage of ORC optimization/storage index etc?
>2. Running FP requires that data is fetched from the underlying tables
>in Hive and brought back to Spark cluster (standalone here) and the joins
>etc are done there
>
> The next step for me would be to:
>
>1. Look at the query plans in Spark
>2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECTt.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> 

Re: Using functional programming rather than SQL

2016-02-24 Thread Mohannad Ali
That is incorrect HiveContext does not need a hive instance to run.
On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> Yes
>
> Regards
> Sab
> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>
>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>> sql?
>>
>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> When using SQL your full query, including the joins, were executed in
>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>> cluster and then the join is executed.
>>>
>>> Thus the time difference.
>>>
>>> It's not immediately obvious why the results are different.
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>


 Hi,

 First thanks everyone for their suggestions. Much appreciated.

 This was the original queries written in SQL and run against Spark-shell

 val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 println ("\nStarted at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 HiveContext.sql("use oraclehadoop")

 val rs = HiveContext.sql(
 """
 SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
 TotalSales
 FROM smallsales s
 INNER JOIN times t
 ON s.time_id = t.time_id
 INNER JOIN channels c
 ON s.channel_id = c.channel_id
 GROUP BY t.calendar_month_desc, c.channel_desc
 """)
 rs.registerTempTable("tmp")
 println ("\nfirst query")
 HiveContext.sql("""
 SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
 from tmp
 ORDER BY MONTH, CHANNEL LIMIT 5
 """).collect.foreach(println)
 println ("\nsecond query")
 HiveContext.sql("""
 SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
 FROM tmp
 GROUP BY channel_desc
 order by SALES DESC LIMIT 5
 """).collect.foreach(println)
 println ("\nFinished at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 sys.exit

 The second queries were written in FP as much as I could as below

 val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 println ("\nStarted at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 HiveContext.sql("use oraclehadoop")
 var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
 sales")
 val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
 val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
 times")
 val rs =
 s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
 println ("\nfirst query")
 val rs1 =
 rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
 println ("\nsecond query")
 val rs2
 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
 println ("\nFinished at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 sys.exit



 However The first query results are slightly different in SQL and FP
 (may be the first query code in FP is not exactly correct?) and more
 importantly the FP takes order of magnitude longer compared to SQL (8
 minutes compared to less than a minute). I am not surprised as I expected
 Functional Programming has to flatten up all those method calls and convert
 them to SQL?

 *The standard SQL results*



 Started at
 [23/02/2016 23:55:30.30]
 res1: org.apache.spark.sql.DataFrame = [result: string]
 rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
 channel_desc: string, TotalSales: decimal(20,0)]

 first query
 [1998-01,Direct Sales,9161730]
 [1998-01,Internet,1248581]
 [1998-01,Partners,2409776]
 [1998-02,Direct Sales,9161840]
 [1998-02,Internet,1533193]



 second query
 [Direct Sales,9161840]
 [Internet,3977374]
 [Partners,3976291]
 [Tele Sales,328760]

 Finished at
 [23/02/2016 23:56:11.11]

 *The FP results*

 Started at
 [23/02/2016 23:45:58.58]
 res1: org.apache.spark.sql.DataFrame = [result: string]
 s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
 TIME_ID: timestamp, CHANNEL_ID: bigint]
 c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
 string]

Re: Using functional programming rather than SQL

2016-02-24 Thread Sabarish Sasidharan
Yes

Regards
Sab
On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on spark
> sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>> sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>
>>>
>>> However The first query results are slightly different in SQL and FP
>>> (may be the first query code in FP is not exactly correct?) and more
>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>> minutes compared to less than a minute). I am not surprised as I expected
>>> Functional Programming has to flatten up all those method calls and convert
>>> them to SQL?
>>>
>>> *The standard SQL results*
>>>
>>>
>>>
>>> Started at
>>> [23/02/2016 23:55:30.30]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9161730]
>>> [1998-01,Internet,1248581]
>>> [1998-01,Partners,2409776]
>>> [1998-02,Direct Sales,9161840]
>>> [1998-02,Internet,1533193]
>>>
>>>
>>>
>>> second query
>>> [Direct Sales,9161840]
>>> [Internet,3977374]
>>> [Partners,3976291]
>>> [Tele Sales,328760]
>>>
>>> Finished at
>>> [23/02/2016 23:56:11.11]
>>>
>>> *The FP results*
>>>
>>> Started at
>>> [23/02/2016 23:45:58.58]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>>> string]
>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>> CALENDAR_MONTH_DESC: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9086830]
>>> [1998-01,Internet,1247641]

Re: Restricting number of cores not resulting in reduction in parallelism

2016-02-24 Thread Chitturi Padma
Hi,

 I didn't get the point that you want to mention i.e "distribute
computation across nodes by restricting parallelism on each node". Do you
mean per node you are expecting only one task to run ?
Can you please paste the configuration changes you made ?

On Wed, Feb 24, 2016 at 11:24 PM, firemonk91 [via Apache Spark User List] <
ml-node+s1001560n26323...@n3.nabble.com> wrote:

> Can you paste the logs as well.
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Restricting-number-of-cores-not-resulting-in-reduction-in-parallelism-tp26319p26323.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Restricting-number-of-cores-not-resulting-in-reduction-in-parallelism-tp26319p26324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: rdd.collect.foreach() vs rdd.collect.map()

2016-02-24 Thread Chitturi Padma
rdd.collect() never does any processing on the workers. It brings the
entire rdd as an in-memory collection back to driver

On Wed, Feb 24, 2016 at 10:58 PM, Anurag [via Apache Spark User List] <
ml-node+s1001560n26320...@n3.nabble.com> wrote:

> Hi Everyone
>
> I am new to Scala and Spark.
>
> I want to know
>
> 1. does Rdd.collect().foreach() do processing in parallel?
>
> 2. does Rdd.collect().map() do processing in parallel ?
>
> Thanks in advance.
> Regards
> Anurag
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/rdd-collect-foreach-vs-rdd-collect-map-tp26320.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-collect-foreach-vs-rdd-collect-map-tp26320p26322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Implementing random walk in spark

2016-02-24 Thread naveenkumarmarri
Hi,

I'm new to spark, I'm trying to compute similarity between users/products.
I've a huge table which I can't do a self join with the cluster I have.

I'm trying to implement do self join using random walk methodology which
will approximately give the results. The table is a bipartite graph with 2
columns

Idea:

   - take any element(t1) in the first column in random
   - picking the corresponding element(t2) in for the element(t1) in the
   graph.
   - lookup for possible elements in the graph for t2 in random say t3
   - create a edge between t1 and t3
   - Iterate it in the order of atleat n*n so that results will be
   approximate

Questions


   - Is spark a suitable environment to do this?
   - I've coded logic for picking elements in random but facing issue when
   building graph
   - Should consider graphx?

Any help is highly appreciated.

Regards,
Naveen


Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
spark streaming is provided, kafka is not.

This build file

https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt

includes some hacks for ivy issues that may no longer be strictly
necessary, but try that build and see if it works for you.


On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari 
wrote:

> Hello,
>
> I have tried multiple different settings in build.sbt but seems like
> nothing is working.
> Can anyone suggest the right syntax/way to include kafka with spark?
>
> Error
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
>
> build.sbt
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
> )
>
>
> Thanks,
> Vinti
>
>


Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Hello,

I have tried multiple different settings in build.sbt but seems like
nothing is working.
Can anyone suggest the right syntax/way to include kafka with spark?

Error
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$

build.sbt
libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
  "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
)


Thanks,
Vinti


RE: Spark Streaming - graceful shutdown when stream has no more data

2016-02-24 Thread Cheng, Hao
This is very interesting, how to shutdown the streaming job gracefully once no 
input data for some time.

A doable solution probably you can count the input data by using the 
Accumulator, and anther thread (in master node) will always to get the latest 
accumulator value, if there is no value change from the accumulator for 
sometime, then shutdown the streaming job.

From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
Sent: Wednesday, February 24, 2016 12:30 AM
To: Ashutosh Kumar 
Cc: Hemant Bhanawat ; Ted Yu ; Femi 
Anthony ; user 
Subject: Re: Spark Streaming - graceful shutdown when stream has no more data

During testing you will typically be using some finite data. You want the 
stream to shut down automatically when that data has been consumed so your test 
shuts down gracefully.
Of course once the code is running in production you'll want it to keep waiting 
for new records. So whether the stream shuts down when there's no more data 
should be configurable.


On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
> wrote:
Just out of curiosity I will like to know why a streaming program should 
shutdown when no new data is arriving?  I think it should keep waiting for 
arrival of new records.
Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
> wrote:
A guess - parseRecord is returning None in some case (probaly empty lines). And 
then entry.get is throwing the exception.
You may want to filter the None values from accessLogDStream before you run the 
map function over it.
Hemant

Hemant Bhanawat
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu 
> wrote:
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program.

Cheers

On Feb 23, 2016, at 12:25 AM, Femi Anthony 
> wrote:

I am working on Spark Streaming API and I wish to stream a set of 
pre-downloaded web log files continuously to simulate a real-time stream. I 
wrote a script that gunzips the compressed logs and pipes the output to nc on 
port .

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive

zipped_files=`find $BASEDIR -name "*.gz"`



for zfile in $zipped_files

 do

  echo "Unzipping $zfile..."

  gunzip -c $zfile  | nc -l -p  -q 20



 done
I have streaming code written in Scala that processes the streams. It works 
well for the most part, but when its run out of files to stream I get the 
following error in Spark:



16/02/19 23:04:35 WARN ReceiverSupervisorImpl:

Restarting receiver with delay 2000 ms: Socket data stream had no more data

16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:

Restarting receiver with delay 2000ms: Socket data stream had no more data

16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated to 
only 0 peer(s) instead of 1 peers



16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)

java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:313)

at scala.None$.get(Option.scala:311)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully 
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {

 def main(args: Array[String]) {

  val master = args(0)

  val conf = new

 SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")

 // Create a StreamingContext with a n second batch size

  val ssc = new StreamingContext(conf, Seconds(10))

 // Create a DStream from all the input on port 

  val log = Logger.getLogger(getClass.getName)



  sys.ShutdownHookThread {

  log.info("Gracefully stopping Spark Streaming Application")

  ssc.stop(true, true)

  log.info("Application stopped")

  }

  val lines = ssc.socketTextStream("localhost", )

  // Create a count of log hits by ip

  var ipCounts=countByIp(lines)

  ipCounts.print()



  // start our streaming context and wait for it to "finish"

  ssc.start()

  // Wait for 600 seconds then exit

  ssc.awaitTermination(1*600)

  ssc.stop()

  }



 def countByIp(lines: DStream[String]) = {

   val parser = new AccessLogParser

   val accessLogDStream = lines.map(line => parser.parseRecord(line))

   val ipDStream = accessLogDStream.map(entry =>


Re: Spark Application Master on Yarn client mode - Virtual memory limit

2016-02-24 Thread Nirav Patel
Thanks Steve for insights into design choices of spark AM. Here's counter
arguments:

2. on Killing. I don't think using Virtual Memory (swaps ) for one
application will downgrade performance of entire cluster and other
applications drastically. For that given application, cluster will only use
resources that are given to it. i.e. assigned number of cores and memory
and caches. Fact that nodemanager allows vmem for container, yarn based AM
must use that or allow developer to change it at application level if
performance is concern for everything.

3. I already mentioned vmem check is disabled. in any case spark AM should
not kill container if vmem limit is not reached.

On you last comment - off course it's a JVM heap error because container
never used vmem/swap in first place! and that is what I am complaining!

On Wed, Feb 17, 2016 at 4:12 PM, Steve Loughran 
wrote:

>
> On 17 Feb 2016, at 01:29, Nirav Patel  wrote:
>
> I think you are not getting my question . I know how to tune executor
> memory settings and parallelism . That's not an issue. It's a specific
> question about what happens when physical memory limit of given executor is
> reached. Now yarn nodemanager has specific setting about provisioning
> virtual memory which can be utilized by a map reduce program. And I have
> seen it. But looks like spark application can not use virtual memory at
> all! It just kills an executor once it's physical memory limit is reached .
> Does anyone has explanation why such design choice was made?
>
>
>
>
>1. The NM memory management is not specific to any app; it's for all
>containers. Where it is weak is that it assumes that the cost of a loss of
>a process is relatively low; its worse in long-lived (i.e. streaming) apps,
>as state gets lost. There's some improvements there in discussion on
>YARN-4692.
>2. Why the killing? If some process starts to swap it hurts all the
>others —the performance of the entire cluster downgrades, without any
>obvious cause. That is, the person who underestimated their resource
>requirements doesn't get the direct feedback of "you need to ask for more
>memory"
>3. There's some forgiveness at app launch, so that if your code does a
>fork() or two its vmem allocation can go up, but otherwise, if the vmem
>check is enabled, your process gets killed. The Spark AM gets told, and
>doesn't treat it as seriously as other problems (or view the specific host
>as unreliable in any way)
>4. Otherwise, the fix is to ask for the memory you need.
>
>
> Looking at the stack trace, that's not VM/swap, thats JVM heap size.
> Separate problem.
>
> -Steve
>
> Ps - I have given 16gb per executor which is more than enough to handle
> biggest skewed in our data set. So
>
> Sent from my iPhone
>
> On Feb 15, 2016, at 8:40 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
> Looks like your executors are running out of memory. YARN is not kicking
> them out. Just increase the executor memory. Also considering increasing
> the parallelism ie the number of partitions.
>
> Regards
> Sab
> On 11-Feb-2016 5:46 am, "Nirav Patel"  wrote:
>
>> In Yarn we have following settings enabled so that job can use virtual
>> memory to have a capacity beyond physical memory off course.
>>
>> 
>> yarn.nodemanager.vmem-check-enabled
>> false
>> 
>>
>> 
>> yarn.nodemanager.pmem-check-enabled
>> false
>> 
>>
>> vmem to pmem ration is 2:1. However spark doesn't seem to be able to
>> utilize this vmem limits
>> we are getting following heap space error which seemed to be contained
>> within spark executor.
>>
>> 16/02/09 23:08:06 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
>> SIGNAL 15: SIGTERM
>> 16/02/09 23:08:06 ERROR executor.Executor: Exception in task 4.0 in stage
>> 7.6 (TID 22363)
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
>> at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
>> at
>> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
>> at
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203)
>> at
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202)
>> at
>> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
>> at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
>> at
>> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
>> at
>> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
>> at
>> 

Implementing random walk in spark

2016-02-24 Thread naveenkumarmarri
Hi,

I'm new to spark, I'm trying to compute similarity between users/products.
I've a huge table which I can't do a self join with the cluster I have.

I'm trying to implement do self join using random walk methodology which
will approximately give the results. The table is a bipartite graph with 2
columns

Idea:

   - take any element(t1) in the first column in random
   - picking the corresponding element(t2) in for the element(t1) in the
   graph.
   - lookup for possible elements in the graph for t2 in random say t3
   - create a edge between t1 and t3
   - Iterate it in the order of atleat n*n so that results will be
   approximate

Questions


   - Is spark a suitable environment to do this?
   - I've coded logic for picking elements in random but facing issue when
   building graph
   - Should consider graphx?

Any help is highly appreciated.

Regards,
Naveen


Re: Using Spark functional programming rather than SQL, Spark on Hive tables

2016-02-24 Thread Mich Talebzadeh
 

HI, 

TOOLS 

SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 

OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND
RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON
HIVE TABLES 

UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 

The main differences in timings come from running the queries and
fetching data. If you look the transformation part that is 

val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))


Takes I second. On the other hand using SQL the query 1 takes 19 seconds
compared to just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes. 

These are my assumptions. 

* Running SQL the full query is executed in Hive which means that Hive
can take advantage of ORC optimization/storage index etc?
* Running FP requires that data is fetched from the underlying tables
in Hive and brought back to Spark cluster (standalone here) and the
joins etc are done there

The next step for me would be to: 

* Look at the query plans in Spark
* Run the same code on Hive alone and compare results

Any other suggestions are welcome. 

STANDARD SQL CODE 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
println ("ncreating data set at "); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc
 , c.channel_desc
 , SUM(s.amount_sold) AS TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 09:00:50.50]
res1: org.apache.spark.sql.DataFrame = [result: string] 

creating data set at [24/02/2016 09:00:53.53]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0) 

First query at [24/02/2016 09:00:54.54]
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query at [24/02/2016 09:01:13.13]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at [24/02/2016 09:01:31.31 

CODE USING FUNCTIONAL PROGRAMMING 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c =
HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t =
HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
println ("ncreating data set at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 08:52:27.27]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, 

Re: Kafka partition increased while Spark Streaming is running

2016-02-24 Thread Cody Koeninger
That's correct, when you create a direct stream, you specify the
topicpartitions you want to be a part of the stream (the other method for
creating a direct stream is just a convenience wrapper).

On Wed, Feb 24, 2016 at 2:15 AM, 陈宇航  wrote:

> Here I use the *'KafkaUtils.createDirectStream'* to integrate Kafka with
> Spark Streaming. I submitted the app, then I changed (increased) Kafka's
> partition number after it's running for a while. Then I check the input
> offset with '*rdd.asInstanceOf[HasOffsetRanges].offsetRanges*', seeing
> that only the offset of the initial partitions are returned.
>
> Does this mean Spark Streaming's Kafka integration can't update its
> parallelism when Kafka's partition number is changed?
>


Re: Using functional programming rather than SQL

2016-02-24 Thread Koert Kuipers
are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?

On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP (may
>> be the first query code in FP is not exactly correct?) and more importantly
>> the FP takes order of magnitude longer compared to SQL (8 minutes compared
>> to less than a minute). I am not surprised as I expected Functional
>> Programming has to flatten up all those method calls and convert them to
>> SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9161730]
>> [1998-01,Internet,1248581]
>> [1998-01,Partners,2409776]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>>
>>
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>>
>> Finished at
>> [23/02/2016 23:56:11.11]
>>
>> *The FP results*
>>
>> Started at
>> [23/02/2016 23:45:58.58]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> timestamp, CHANNEL_ID: bigint]
>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>> string]
>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>> CALENDAR_MONTH_DESC: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9086830]
>> [1998-01,Internet,1247641]
>> [1998-01,Partners,2393567]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>> rs1: Unit = ()
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>> rs2: 

Re: spark.local.dir configuration

2016-02-24 Thread Takeshi Yamamuro
Hi,

No, there is no way to change local dir paths after Worker initialized.
That is, dir paths are cached when a first executor is launched, then
following executors reference the paths.
Details can be found in codes below;
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L449
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L679


On Wed, Feb 24, 2016 at 5:13 PM, Jung  wrote:

> Hi all,
> In the standalone mode, spark.local.dir is ignored after Spark Worker
> launched.
> This is a scenario assuming that we have 1 master node and 1 worker node.
> 1. $SPARK_HOME/sbin/start-all.sh to launch Spark Master
> 2. Modify worker node configuration($SPARK_HOME/conf/spark-defaults.conf)
> spark.local.dir to another directory( /tmp_new).
> 3. run spark-shell from master node.
>
> Executor in this scenario creates scratch directory like
> "spark-bb0876f2-7fa9-4f15-b790-24252183a4f1" under /tmp not /tmp_new.
> Because worker set immutable SparkConf instance at the first time it
> launched and refer to this variable when create new executor which wants to
> change its scratch dir.
> Can I change application's spark.local.dir without restarting spark
> workers?
>
> Thanks,
> Jung




-- 
---
Takeshi Yamamuro


Re: Execution plan in spark

2016-02-24 Thread Mich Talebzadeh
 

Also bear in mind that explain() method call works on transformations
(Transformations are just manipulations of the data.). 

examples filter, map, orderBy etc 

scala> var y =
HiveContext.table("sales").select("time_id").agg(max("time_id")).explain(true)

== Parsed Logical Plan == 

'Aggregate [max('time_id) AS max(time_id)#359]
 Project [time_id#354]
 MetastoreRelation oraclehadoop, sales, None 

== Analyzed Logical Plan ==
max(time_id): timestamp
Aggregate [max(time_id#354) AS max(time_id)#359]
 Project [time_id#354]
 MetastoreRelation oraclehadoop, sales, None 

== Optimized Logical Plan ==
Aggregate [max(time_id#354) AS max(time_id)#359]
 Project [time_id#354]
 MetastoreRelation oraclehadoop, sales, None 

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(max(time_id#354),mode=Final,isDistinct=false)],
output=[max(time_id)#359])
 TungstenExchange SinglePartition
 TungstenAggregate(key=[],
functions=[(max(time_id#354),mode=Partial,isDistinct=false)],
output=[max#363])
 HiveTableScan [time_id#354], (MetastoreRelation oraclehadoop, sales,
None) 

Code Generation: true
y: Unit = () 

On 24/02/2016 09:49, Ashok Kumar wrote: 

> looks useful thanks 
> 
> On Wednesday, 24 February 2016, 9:42, Yin Yang  wrote:
> 
> Is the following what you were looking for ? 
> 
> sqlContext.sql(""" 
> CREATE TEMPORARY TABLE partitionedParquet 
> USING org.apache.spark.sql.parquet 
> OPTIONS ( 
> path '/tmp/partitioned' 
> )""") 
> 
> table("partitionedParquet").explain(true) 
> 
> On Wed, Feb 24, 2016 at 1:16 AM, Ashok Kumar  
> wrote:
> 
>> Gurus, 
>> 
>> Is there anything like explain in Spark to see the execution plan in 
>> functional programming? 
>> 
>> warm regards

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: About Tensor Factorization in Spark

2016-02-24 Thread Li Jiajia
I see. Thanks very much, Nick! I’m thinking to take this as my class project. 
:-)

Best regards!
Jiajia Li

--
E-mail: jiaji...@gatech.edu
Tel: +1 (404)9404603
Computational Science & Engineering
Georgia Institute of Technology

> On Feb 24, 2016, at 1:10 AM, Nick Pentreath  wrote:
> 
> Not that I'm aware of - it would be a great addition as a Spark package!
> 
> On Wed, 24 Feb 2016 at 06:33 Li Jiajia  > wrote:
> Thanks Nick. I found this one. This library is focusing on a particular 
> application I guess, seems only implemented one tensor factorization 
> algorithm by far, and only for three dimensional tensors. Are there Spark 
> powered libraries supporting general tensors and the algorithms?
> 
> Best regards!
> Jiajia Li
> 
> --
> E-mail: jiaji...@gatech.edu 
> Tel: +1 (404)9404603
> Computational Science & Engineering
> Georgia Institute of Technology
> 
>> On Feb 23, 2016, at 11:12 PM, Nick Pentreath > > wrote:
>> 
>> There is this library that I've come across - 
>> https://github.com/FurongHuang/SpectralLDA-TensorSpark 
>> 
>> On Wed, 24 Feb 2016 at 05:50, Li Jiajia > > wrote:
>> Hi,
>>  I wonder if there are tensor algorithms or tensor data structures 
>> supported by Spark MLlib or GraphX. In a Spark intro slide, tensor 
>> factorization is mentioned as one of the algorithms in GraphX, but I didn't 
>> find it in the guide. If not, do you plan to implement them in the future? 
>>  I’m new to Spark, please give some detailed explanation if possible. 
>> Thanks in advance. 
>> 
>> Best regards!
>> Jiajia Li
>> 
>> --
>> E-mail: jiaji...@gatech.edu 
>> Tel: +1 (404)9404603
>> Computational Science & Engineering
>> Georgia Institute of Technology
>> 
> 



How to achieve co-location of task and source data

2016-02-24 Thread Oliver Koeth
We are developing an RDD (and later a DataSource on top of it) to access distributed data in our Spark cluster and want to achive co-location of tasks working on the data with their source data partitions.
Overriding RDD.getPreferredLocations should be the way to achieve that, so each RDD partition can indicate on which server it should be processed. Unfortunately, there seems to be no clearly defined way how Spark identifies the server on which an Executor is running: With the mesos cluster manager, Spark tracks Executors by fully qualified host name, with standalone cluster manager, it used the IP address in Spark 1.5. In Spark 1.6 this seems to have changed to a host name.
The code in the Spark task scheduler that matches preferred locations to Executors does a map lookup and requires an exact textual match of the string specified as preferred location with the host name provided by the Executor. So, if the formats don't match, task locality handling does not work at all, but there seems to be no "standard" format for the location. So how can one write a custom RDD overriding getPreferredLocations that will work without specific dependencies on a concrete Spark setup?
 
There seems to be no way for user code to get access to the internal scheduler info tracking executors by host. It seems that even the host name reported to a SparkListener for ExecutorAdded is not reliably the same value that the scheduler uses internally for lookup.
 
Mit freundlichen Grüßen / Best regards,Oliver KöthIBM Deutschland Research & Development GmbHVorsitzender des Aufsichtsrats: Martina Koederitz, Geschäftsführung: Dirk Wittkopp, Sitz der Gesellschaft: Böblingen, Registergericht: Amtsgericht Stuttgart, HRB 243294


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Reindexing in graphx

2016-02-24 Thread Udbhav Agarwal
Sounds useful Robin. Thanks. I will try that. But fyi in another case I tested 
with adding only one vertex to the graph. In that case also the latency for 
subsequent addition was increasing like for first addition of a vertex its 3 
seconds, then for second its 7 seconds and so on. This is a case when I want to 
add vertices to graph as and when they are coming in our system since it’s a 
real time system which I am trying to build so vertices will be keep on coming.

Thanks.
From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Wednesday, February 24, 2016 3:54 PM
To: Udbhav Agarwal 
Cc: user@spark.apache.org
Subject: Re: Reindexing in graphx

It looks like you adding vertices one-by-one, you definitely don’t want to do 
that. What happens when you batch together 400 vertices into an RDD and then 
add 400 in one go?
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 24 Feb 2016, at 05:49, Udbhav Agarwal 
> wrote:

Thank you Robin for your reply.
Actually I am adding bunch of vertices in a graph in graphx using the following 
method . I am facing the problem of latency. First time an addition of say 400 
vertices to a graph with 100,000 nodes takes around 7 seconds. next time its 
taking 15 seconds. So every subsequent adds are taking more time than the 
previous one. Hence I tried to do reindex() so the subsequent operations can 
also be performed fast.
FYI My cluster is presently having one machine with 8 core and 8 gb ram. I am 
running in local mode.

def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long = {
val defaultUser = (0, 0)
rdd.collect().foreach { x =>
  {
val aVertex: RDD[(VertexId, (Int, Int))] = 
sc.parallelize(Array((x.toLong, (100, 100
gVertices = gVertices.union(aVertex)
  }
}
inputGraph = Graph(gVertices, gEdges, defaultUser)
inputGraph.cache()
gVertices = inputGraph.vertices
gVertices.cache()
val count = gVertices.count
println(count);

return 1;
  }


From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Tuesday, February 23, 2016 8:15 PM
To: Udbhav Agarwal 
>
Subject: Re: Reindexing in graphx

Hi

Well this is the line that is failing in VertexRDDImpl:

require(partitionsRDD.partitioner.isDefined)

But really you shouldn’t need to be calling the reindex() function as it deals 
with some internals of the GraphX implementation - it looks to me like it ought 
to be a private method. Perhaps you could explain what you are trying to 
achieve.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 23 Feb 2016, at 12:18, Udbhav Agarwal 
> wrote:

Hi,
I am trying to add vertices to a graph in graphx and I want to do reindexing in 
the graph. I can see there is an option of vertices.reindex() in graphX. But 
when I am doing graph.vertices.reindex() am getting
Java.lang.IllegalArgumentException: requirement failed.
Please help me know what I am missing with the syntax as I have seen the API 
documentation where only vertices.reindex() is mentioned.

Thanks,
Udbhav Agarwal



Re: Apache Arrow + Spark examples?

2016-02-24 Thread Petr Novak
How Arrows collide with Tungsten and its binary in-memory format. It will
still has to convert between them. I assume they use similar
concepts/layout hence it is likely the conversion can be quite efficient.
Or is there a change that the current Tungsten in memory format would be
replaced by Arrows in the future. The same applies for Impala, Drill and
all others. Is the goal to unify internal in-memory representation for all
of them or the benefit is going to be in conversions faster by e.g. order
of magnitude?

Many thanks for any explanation,
Petr


RE: Apache Arrow + Spark examples?

2016-02-24 Thread Sun, Rui
Spark has not supported Arrow yet. There is a JIRA 
https://issues.apache.org/jira/browse/SPARK-13391 requesting working on it.

From: Robert Towne [mailto:robert.to...@webtrends.com]
Sent: Wednesday, February 24, 2016 5:21 AM
To: user@spark.apache.org
Subject: Apache Arrow + Spark examples?

I have been reading some of the news this week about Apache Arrow as a new top 
level project.  It appears to be a common data layer between Spark and other 
systems (Cassandra, Drill, Impala, etc).

Has anyone seen any sample Spark code that integrates with Arrow?

Thanks,
Robert


LDA topic Modeling spark + python

2016-02-24 Thread Mishra, Abhishek
Hello All,





I am doing a LDA model, please guide me with something.



I have a csv file which has two column "user_id" and "status". I have to 
generate a word-topic distribution after aggregating the user_id. Meaning to 
say I need to model it for users on their grouped status. The topic length 
being 2000 and value of k or number of words being 3.



Please, if you can provide me with some link or some code base on spark with 
python ; I would be grateful.





Looking forward for a  reply,



Sincerely,

Abhishek



[Query] : How to read null values in Spark 1.5.2

2016-02-24 Thread Divya Gehlot
Hi,
I have a data set(source is data -> database) which has null values .
When I am defining the custom schema as any type except string type,
I get number format exception on null values .
Has anybody come across this kind of scenario?
Would really appreciate if you can share your resolution or workaround.

Thanks,
Divya


Re: Reindexing in graphx

2016-02-24 Thread Robin East
It looks like you adding vertices one-by-one, you definitely don’t want to do 
that. What happens when you batch together 400 vertices into an RDD and then 
add 400 in one go?
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 24 Feb 2016, at 05:49, Udbhav Agarwal  wrote:
> 
> Thank you Robin for your reply.
> Actually I am adding bunch of vertices in a graph in graphx using the 
> following method . I am facing the problem of latency. First time an addition 
> of say 400 vertices to a graph with 100,000 nodes takes around 7 seconds. 
> next time its taking 15 seconds. So every subsequent adds are taking more 
> time than the previous one. Hence I tried to do reindex() so the subsequent 
> operations can also be performed fast. 
> FYI My cluster is presently having one machine with 8 core and 8 gb ram. I am 
> running in local mode.
>  
> def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long = {
> val defaultUser = (0, 0)
> rdd.collect().foreach { x =>
>   {
> val aVertex: RDD[(VertexId, (Int, Int))] = 
> sc.parallelize(Array((x.toLong, (100, 100
> gVertices = gVertices.union(aVertex)
>   }
> }
> inputGraph = Graph(gVertices, gEdges, defaultUser)
> inputGraph.cache()
> gVertices = inputGraph.vertices
> gVertices.cache()
> val count = gVertices.count
> println(count);
> 
> return 1;
>   }
>  
>  
> From: Robin East [mailto:robin.e...@xense.co.uk] 
> Sent: Tuesday, February 23, 2016 8:15 PM
> To: Udbhav Agarwal 
> Subject: Re: Reindexing in graphx
>  
> Hi
>  
> Well this is the line that is failing in VertexRDDImpl:
>  
> require(partitionsRDD.partitioner.isDefined)
>  
> But really you shouldn’t need to be calling the reindex() function as it 
> deals with some internals of the GraphX implementation - it looks to me like 
> it ought to be a private method. Perhaps you could explain what you are 
> trying to achieve.
> ---
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action 
> 
>  
>  
>  
> 
>  
> On 23 Feb 2016, at 12:18, Udbhav Agarwal  > wrote:
>  
> Hi,
> I am trying to add vertices to a graph in graphx and I want to do reindexing 
> in the graph. I can see there is an option of vertices.reindex() in graphX. 
> But when I am doing graph.vertices.reindex() am getting 
> Java.lang.IllegalArgumentException: requirement failed.
> Please help me know what I am missing with the syntax as I have seen the API 
> documentation where only vertices.reindex() is mentioned.
>  
> Thanks,
> Udbhav Agarwal



Re: filter by dict() key in pySpark

2016-02-24 Thread Franc Carter
A colleague found how to do this, the approach was to use a udf()

cheers

On 21 February 2016 at 22:41, Franc Carter  wrote:

>
> I have a DataFrame that has a Python dict() as one of the columns. I'd
> like to filter he DataFrame for those Rows that where the dict() contains a
> specific value. e.g something like this:-
>
> DF2 = DF1.filter('name' in DF1.params)
>
> but that gives me this error
>
> ValueError: Cannot convert column into bool: please use '&' for 'and', '|'
> for 'or', '~' for 'not' when building DataFrame boolean expressions.
>
> How do I express this correctly ?
>
> thanks
>
> --
> Franc
>



-- 
Franc


Re: Execution plan in spark

2016-02-24 Thread Ashok Kumar
looks useful thanks 

On Wednesday, 24 February 2016, 9:42, Yin Yang  wrote:
 

 Is the following what you were looking for ?
    sqlContext.sql("""    CREATE TEMPORARY TABLE partitionedParquet    USING 
org.apache.spark.sql.parquet    OPTIONS (      path '/tmp/partitioned'    )""")
    table("partitionedParquet").explain(true)
On Wed, Feb 24, 2016 at 1:16 AM, Ashok Kumar  
wrote:

 Gurus,
Is there anything like explain in Spark to see the execution plan in functional 
programming?
warm regards



  

Re: Execution plan in spark

2016-02-24 Thread Yin Yang
Is the following what you were looking for ?

sqlContext.sql("""
CREATE TEMPORARY TABLE partitionedParquet
USING org.apache.spark.sql.parquet
OPTIONS (
  path '/tmp/partitioned'
)""")

table("partitionedParquet").explain(true)

On Wed, Feb 24, 2016 at 1:16 AM, Ashok Kumar 
wrote:

> Gurus,
>
> Is there anything like explain in Spark to see the execution plan in
> functional programming?
>
> warm regards
>


How to achieve co-location of task and source data

2016-02-24 Thread okoeth
We are developing an RDD (and later a DataSource on top of it) to access
distributed data in our Spark cluster and want to achive co-location of
tasks working on the data with their source data partitions.

Overriding RDD.getPreferredLocations should be the way to achieve that, so
each RDD partition can indicate on which server it should be processed.
Unfortunately, there seems to be no clearly defined way how Spark identifies
the server on which an Executor is running: With the mesos cluster manager,
Spark tracks Executors by fully qualified host name, with standalone cluster
manager, it used the IP address in Spark 1.5. In Spark 1.6 this seems to
have changed to a host name.

The code in the Spark task scheduler that matches preferred locations to
Executors does a map lookup and requires an exact textual match of the
string specified as preferred location with the host name provided by the
Executor. So, if the formats don't match, task locality handling does not
work at all, but there seems to be no "standard" format for the location. So
how can one write a custom RDD overriding getPreferredLocations that will
work without specific dependencies on a concrete Spark setup?

There seems to be no way for user code to get access to the internal
scheduler info tracking executors by host. It seems that even the host name
reported to a SparkListener for ExecutorAdded is not reliably the same value
that the scheduler uses internally for lookup.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-co-location-of-task-and-source-data-tp26317.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: value from groubBy paired rdd

2016-02-24 Thread Eike von Seggern
Hello Abhishek,

your code appears ok. Can you please post the exception you get? Without,
it's hard to track down the issue.

Best
Eike


Re: How to delete a record from parquet files using dataframes

2016-02-24 Thread Cheng Lian
Parquet is a read-only format. So the only way to remove data from a 
written Parquet file is to write a new Parquet file without unwanted rows.


Cheng

On 2/17/16 5:11 AM, SRK wrote:

Hi,

I am saving my records in the form of parquet files using dataframes in
hdfs. How to delete the records using dataframes?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-a-record-from-parquet-files-using-dataframes-tp26242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Execution plan in spark

2016-02-24 Thread Ashok Kumar
 Gurus,
Is there anything like explain in Spark to see the execution plan in functional 
programming?
warm regards

Re: reasonable number of executors

2016-02-24 Thread Alex Dzhagriev
Hi Igor,

That's a great talk and an exact answer to my question. Thank you.

Cheers, Alex.

On Tue, Feb 23, 2016 at 8:27 PM, Igor Berman  wrote:

>
> http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications
>
> there is a section that is connected to your question
>
> On 23 February 2016 at 16:49, Alex Dzhagriev  wrote:
>
>> Hello all,
>>
>> Can someone please advise me on the pros and cons on how to allocate the
>> resources: many small heap machines with 1 core or few machines with big
>> heaps and many cores? I'm sure that depends on the data flow and there is
>> no best practise solution. E.g. with bigger heap I can perform map-side
>> join with bigger table. What other considerations should I keep in mind in
>> order to choose the right configuration?
>>
>> Thanks, Alex.
>>
>
>


Kafka partition increased while Spark Streaming is running

2016-02-24 Thread ??????
Here I use the 'KafkaUtils.createDirectStream' to integrate Kafka with Spark 
Streaming. I submitted the app, then I changed (increased) Kafka's partition 
number after it's running for a while. Then I check the input offset with 
'rdd.asInstanceOf[HasOffsetRanges].offsetRanges', seeing that only the offset 
of the initial partitions are returned.


Does this mean Spark Streaming's Kafka integration can't update its parallelism 
when Kafka's partition number is changed?