Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-19 Thread Sujit Pal
Hi Janardhan,

You need the classifier "models" attribute on the second entry for
stanford-corenlp to indicate that you want the models JAR, as shown below.
Right now you are importing two instances of stanford-corenlp JARs.

libraryDependencies ++= {
  val sparkVersion = "2.0.0"
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"" % "protobuf-java" % "2.6.1",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
"org.scalatest" %% "scalatest" % "2.2.6" % "test"


On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty <>

> Hi Sujit,
> Tried that option but same error:
> java version "1.8.0_51"
> libraryDependencies ++= {
>   val sparkVersion = "2.0.0"
>   Seq(
> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>   )
> }
> Error:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP
> at$$anonfun$createTransformFunc$1.
> apply(Lemmatizer.scala:37)
> at$$anonfun$createTransformFunc$1.
> apply(Lemmatizer.scala:33)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
> 2.apply(ScalaUDF.scala:88)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
> 2.apply(ScalaUDF.scala:87)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(
> ScalaUDF.scala:1060)
>     at org.apache.spark.sql.catalyst.expressions.Alias.eval(
> namedExpressions.scala:142)
> at org.apache.spark.sql.catalyst.expressions.
> InterpretedProjection.apply(Projection.scala:45)
> at org.apache.spark.sql.catalyst.expressions.
> InterpretedProjection.apply(Projection.scala:29)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$
> TraversableLike.scala:234)
> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal <> wrote:
>> Hi Janardhan,
>> Maybe try removing the string "test" from this line in your build.sbt?
>> IIRC, this restricts the models JAR to be called from a test.
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
>> "models",
>> -sujit
>> On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty <
>>> wrote:
>>> Hi,
>>> I am trying to use lemmatization as a transformer and added belwo to the
>>> build.sbt
>>>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "" % "protobuf-java" % "2.6.1",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
>>> classifier "models",
>>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>> Error:
>>> *Exception in thread "main" java.lang.NoClassDefFoundError:
>>> edu/stanford/nlp/pipeline/StanfordCoreNLP*
>>> I have tried other versions of this spark package.
>>> Any help is appreciated..

Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-18 Thread Sujit Pal
Hi Janardhan,

Maybe try removing the string "test" from this line in your build.sbt?
IIRC, this restricts the models JAR to be called from a test.

"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier


On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty 

> Hi,
> I am trying to use lemmatization as a transformer and added belwo to the
> build.sbt
>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
> "models",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
> Error:
> *Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP*
> I have tried other versions of this spark package.
> Any help is appreciated..

Re: pyspark mappartions ()

2016-05-14 Thread Sujit Pal
I built this recently using the accepted answer on this SO page:


On Sat, May 14, 2016 at 7:00 AM, Mathieu Longtin 

> From memory:
> def processor(iterator):
>   for item in iterator:
> newitem = do_whatever(item)
> yield newitem
> newdata = data.mapPartition(processor)
> Basically, your function takes an iterator as an argument, and must either
> be an iterator or return one.
> On Sat, May 14, 2016 at 12:39 AM Abi  wrote:
>> On Tue, May 10, 2016 at 2:20 PM, Abi  wrote:
>>> Is there any example of this ? I want to see how you write the the
>>> iterable example
>> --
> Mathieu Longtin
> 1-514-803-8977

Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread Sujit Pal
Hi Charles,

I tried this with dummied out functions which just sum transformations of a
list of integers, maybe they could be replaced by algorithms in your case.
The idea is to call them through a "god" function that takes an additional
type parameter and delegates out to the appropriate function. Here's my
code, maybe it helps...

def f0(xs):
>   return len(xs)
> def f1(xs):
>   return sum(xs)
> def f2(xs):
>   return sum([x**2 for x in xs])
> def f_god(n, xs):
>   if n == 1:
> return f1(xs)
>   elif n == 2:
> return f2(xs)
>   else:
> return f0(xs)
> xs = [x for x in range(0, 5)]
> xs_b = sc.broadcast(xs)
> ns = sc.parallelize([x for x in range(0, 3)])
> results = n: f_god(n, xs_b.value))
> print results.take(10)

gives me:

[5, 10, 30]

On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau  wrote:

> You probably want to look at the map transformation, and the many more
> defined on RDDs. The function you pass in to map is serialized and the
> computation is distributed.
> On Monday, March 28, 2016, charles li  wrote:
>> use case: have a dataset, and want to use different algorithms on that,
>> and fetch the result.
>> for making this, I think I should distribute my algorithms, and run these
>> algorithms on the dataset at the same time, am I right?
>> but it seems that spark can not parallelize/serialize
>> algorithms/functions, then how to make it?
>> *here is the test code*:
>> def test():
>> pass
>> function_list = [test] * 10
>> sc.parallelize([test] * 10).take(1)
>> *error message: *
>> Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>> 9.0 (TID 105, sh-demo-hadoop-07):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/",
>> line 111, in main
>> process()
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/",
>> line 106, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/",
>> line 263, in dump_stream
>> vs = list(itertools.islice(iterator, batch))
>>   File
>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/", line
>> 1293, in takeUpToNumLeft
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/",
>> line 139, in load_stream
>> yield self._read_with_length(stream)
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/",
>> line 164, in _read_with_length
>> return self.loads(obj)
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/",
>> line 422, in loads
>> return pickle.loads(obj)
>> AttributeError: 'module' object has no attribute 'test'
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at
>> at org.apache.spark.executor.Executor$
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>> at
>> java.util.concurrent.ThreadPoolExecutor$
>> at
>> what's interesting is that* when I run sc.parallelize([test] *
>> 10).collect() , it works fine*, returns :
>> [,
>>  ,
>>  ,
>>  ,
>>  ,
>>  ,
>>  ,
>>  ,
>>  ,
>>  ]
>> --
>> --
>> a spark lover, a quant, a developer and a good man.
> --
> Cell : 425-233-8271
> Twitter:

Re: How to create dataframe from SQL Server SQL query

2015-12-07 Thread Sujit Pal
Hi Ningjun,

Haven't done this myself, saw your question and was curious about the
answer and found this article which you might find useful:

According this article, you can pass in your SQL statement in the "dbtable"
mapping, ie, something like:

val jdbcDF ="jdbc")
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "(select docid, title, docText from
dbo.document where docid between 10 and 1000)"


On Mon, Dec 7, 2015 at 8:26 AM, Wang, Ningjun (LNG-NPV) <> wrote:

> How can I create a RDD from a SQL query against SQLServer database? Here
> is the example of dataframe
> *val* jdbcDF *=*"jdbc").options(
>   *Map*("url" -> "jdbc:postgresql:dbserver",
>   "dbtable" -> "schema.tablename")).load()
> This code create dataframe from a table. How can I create dataframe from a
> query, e.g. “select docid, title, docText from dbo.document where docid
> between 10 and 1000”?
> Ningjun

Re: Please add us to the Powered by Spark page

2015-11-24 Thread Sujit Pal
Thank you Sean, much appreciated.

And yes, perhaps "email dev" is a better option since the traffic is
(probably) lighter and these sorts of requests are more likely to get
noticed. Although one would need to subscribe to the dev list to do that...


On Tue, Nov 24, 2015 at 1:16 AM, Sean Owen <> wrote:

> Not sure who generally handles that, but I just made the edit.
> On Mon, Nov 23, 2015 at 6:26 PM, Sujit Pal <> wrote:
> > Sorry to be a nag, I realize folks with edit rights on the Powered by
> Spark
> > page are very busy people, but its been 10 days since my original
> request,
> > was wondering if maybe it just fell through the cracks. If I should
> submit
> > via some other channel that will make sure it is looked at (or better
> yet, a
> > self service option), please let me know and I will do so.
> >
> > Here is the information again.
> >
> > Organization Name: Elsevier Labs
> > URL:
> > Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> > Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content
> as a
> > Service, Content and Event Analytics, Content/Event based Predictive
> Models
> > and Big Data Processing. We use Scala and Python over Databricks
> Notebooks
> > for most of our work.
> >
> > Thanks very much,
> > Sujit
> >
> > On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal <>
> wrote:
> >>
> >> Hello,
> >>
> >> We have been using Spark at Elsevier Labs for a while now. Would love to
> >> be added to the “Powered By Spark” page.
> >>
> >> Organization Name: Elsevier Labs
> >> URL:
> >> Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> >> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content
> as
> >> a Service, Content and Event Analytics, Content/Event based Predictive
> >> Models and Big Data Processing. We use Scala and Python over Databricks
> >> Notebooks for most of our work.
> >>
> >> Thanks very much,
> >>
> >> Sujit Pal
> >> Technical Research Director
> >> Elsevier Labs
> >>
> >>
> >>
> >

Re: Please add us to the Powered by Spark page

2015-11-23 Thread Sujit Pal
Sorry to be a nag, I realize folks with edit rights on the Powered by Spark
page are very busy people, but its been 10 days since my original request,
was wondering if maybe it just fell through the cracks. If I should submit
via some other channel that will make sure it is looked at (or better yet,
a self service option), please let me know and I will do so.

Here is the information again.

Organization Name: Elsevier Labs
Spark components: Spark Core, Spark SQL, MLLib, GraphX.
Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a
Service, Content and Event Analytics, Content/Event based Predictive Models
and Big Data Processing. We use Scala and Python over Databricks Notebooks
for most of our work.

Thanks very much,

On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal <> wrote:

> Hello,
> We have been using Spark at Elsevier Labs for a while now. Would love to
> be added to the “Powered By Spark” page.
> Organization Name: Elsevier Labs
> URL:
> Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as
> a Service, Content and Event Analytics, Content/Event based Predictive
> Models and Big Data Processing. We use Scala and Python over Databricks
> Notebooks for most of our work.
> Thanks very much,
> Sujit Pal
> Technical Research Director
> Elsevier Labs

Please add us to the Powered by Spark page

2015-11-13 Thread Sujit Pal

We have been using Spark at Elsevier Labs for a while now. Would love to be
added to the “Powered By Spark” page.

Organization Name: Elsevier Labs
Spark components: Spark Core, Spark SQL, MLLib, GraphX.
Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a
Service, Content and Event Analytics, Content/Event based Predictive Models
and Big Data Processing. We use Scala and Python over Databricks Notebooks
for most of our work.

Thanks very much,

Sujit Pal
Technical Research Director
Elsevier Labs

Re: Prevent possible out of memory when using read/union

2015-11-04 Thread Sujit Pal
Hi Alexander,

You may want to try the wholeTextFiles() method of SparkContext. Using that
you could just do something like this:

> .saveAsSequenceFile("hdfs://output_dir")

The wholeTextFiles returns a RDD of ((filename, content)).

You will not have to worry about managing memory as much with this approach.


On Wed, Nov 4, 2015 at 2:12 AM, Alexander Lenz  wrote:

> Hi colleagues,
> In Hadoop I have a lot of folders containing small files. Therefore I am
> reading the content of all folders, union the small files and write the
> unioned data into a single folder
> containing one file. Afterwards I delete the small files and the according
> folders.
> I see two possible emerging problems on which I would like to get your
> opinion:
> 1.   When reading all the files inside the folders into the master
> program, I think it might appear, that there is such an amount of files
> that the master program will run out of memory?
> To prevent this I thought about checking the file size of the folders and
> only read folders in as long as there is enough memory to handle the amount.
> Do you think that this is a possible solution or is there a better
> solution to handle this problem?
> 2.   The other problem is: I am doing a UnionAll to merge all the
> content of the files. In my opinion this will cause that the data needs to
> be brought to a single master and then the data will be unioned there.
> So there might be the same problem, that the application runs out of
> memory.
> My proposed solution would also be to union only if the size does not
> exceed the available memory. Any better solution?
> For a better understanding you can have a look at my code at the bottom of
> the mail.
> Would be glad to hear from your experience as I would assume that this
> problem should be a general one.
> Thanks & Best, Alex
> val sqlContext = new SQLContext(sc)
> //get filesystem
> val conf = new Configuration()
> val fs = FileSystem.get(new URI("hdfs://"),
> conf)
> //get relevant folders
> val directoryStatus = fs.listStatus(new Path("hdfs://
> val latestFolder = directoryStatus.maxBy(x => x.getModificationTime)
> val toWorkFolders = directoryStatus.filter(x => x.getModificationTime
> < latestFolder.getModificationTime)
> //aggregate folder content
> val parquetFiles = => {
> })
> val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y))
> mergedParquet.coalesce(1) //Assemble part-files into one partition
>   ..write.mode(SaveMode.Append)
>   ..parquet("hdfs://
> ")

Re: How to close connection in mapPartitions?

2015-10-23 Thread Sujit Pal
Hi Bin,

Very likely the RedisClientPool is being closed too quickly before map has
a chance to get to it. One way to verify would be to comment out the .close
line and see what happens. FWIW I saw a similar problem writing to Solr
where I put a commit where you have a close, and noticed that the commit
was happening before the actual data insertion (in the .map line) happened
(and no data showing up in the index until the next time I ran the code

At the time I got around it by doing a zipWithIndex on the Iterator, then
doing a partial commit every n records, and finally doing a commit from the
driver code. However, this won't work for you, and there is a better way
outlined on this page (look for Tobias Pfeiffer, its the code block
immediately following):

where you test for hasNext on the iterator and call close if its the last
element, within the scope of the .map call.


On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar <> wrote:

> Are you sure RedisClientPool is being initialized properly in the
> constructor of RedisCache? Can you please copy paste the code that you use
> to initialize RedisClientPool inside the constructor of RedisCache?
> Thanks,
> Aniket
> On Fri, Oct 23, 2015 at 11:47 AM Bin Wang  wrote:
>> BTW, "lines" is a DStream.
>> Bin Wang 于2015年10月23日周五 下午2:16写道:
>>> I use mapPartitions to open connections to Redis, I write it like this:
>>> val seqs = lines.mapPartitions { lines =>
>>>   val cache = new RedisCache(redisUrl, redisPort)
>>>   val result = => Parser.parseBody(line, cache))
>>>   cache.redisPool.close
>>>   result
>>> }
>>> But it seems the pool is closed before I use it. Am I doing anything
>>> wrong? Here is the error:
>>> java.lang.IllegalStateException: Pool not open
>>> at 
>>> org.apache.commons.pool.BaseObjectPool.assertOpen(
>>> at 
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(
>>> at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>> at
>>> at 
>>> at 
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at scala.collection.TraversableLike$
>>> at
>>> at$.parseBody(Parser.scala:26)
>>> at 
>>> at 
>>> at scala.collection.Iterator$$anon$
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at 
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>> at 
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at
>>> at org.apache.spark.executor.Executor$
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$
>>> at

Re: How to get inverse Matrix / RDD or how to solve linear system of equations

2015-10-23 Thread Sujit Pal
Hi Zhiliang,

For a system of equations AX = y, Linear Regression will give you a
best-fit estimate for A (coefficient vector) for a matrix of feature
variables X and corresponding target variable y for a subset of your data.
OTOH, what you are looking for here is to solve for x a system of equations
Ax = b, where A and b are known and you want the vector x.

This Math Stackexchange page [2] explains the math in more detail, but

A * x = b can be rewritten as x = A.I * b.
You can get the pseudo-inverse of A using SVD (Spark MLLib supports SVD
[1]). So the SVD decomposition would make A a product of three other

A = U * S * V.T

and the pseudo-inverse can be written as:

A.I = V * S * U.T

Then x can be found by multiplying A.I with b.



On Fri, Oct 23, 2015 at 2:19 AM, Zhiliang Zhu  wrote:

> Hi Sujit, and All,
> Currently I lost in large difficulty, I am eager to get some help from you.
> There is some big linear system of equations as:
> Ax = b,  A with N number of row and N number of column, N is very large, b
> = [0, 0, ..., 0, 1]T
> Then, I will sovle it to get x = [x1, x2, ..., xn]T.
> The simple solution would be to get inverse(A), and then  x = (inverse(A))
> * b .
> A would be some JavaRDD , however, for RDD/matrix there
> is add/multply/transpose APIs, no inverse API for it!
> Then, how would it conveniently get inverse(A), or just solve the linear
> system of equations by some other way...
> In Spark MLlib, there was linear regression, the training process might be
> to solve the coefficients to get some specific linear model, just is,
> Ax = y, just train by (x, y) to get A , this might be used to solve the
> linear system of equations. It is like that? I could not decide.
> I must show my deep appreciation torwards your all help.
> Thank you very much!
> Zhiliang

Re: Save RandomForest Model from ML package

2015-10-22 Thread Sujit Pal
Hi Sebastian,

You can save models to disk and load them back up. In the snippet below
(copied out of a working Databricks notebook), I train a model, then save
it to disk, then retrieve it back into model2 from disk.

import org.apache.spark.mllib.tree.RandomForest
> import org.apache.spark.mllib.tree.model.RandomForestModel

val model = RandomForest.trainClassifier(data, numClasses,
> categoricalFeaturesInfo,
> numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed)
>, inputDir + "models/randomForestModel")

val model2 = RandomForestModel.load(sc, inputDir +
> "models/randomForestModel")

Not sure if there is PMML support. The model saves itself into a directory
structure that looks like this:

>   _common_metadata
>   _metadata
>   part-r-*.gz.parquet (multiple files)
> metadata/
>   part-0



On Thu, Oct 22, 2015 at 5:33 AM, Sebastian Kuepers <> wrote:

> Hey,
> I try to figure out the best practice on saving and loading models which
> have bin fitted with the ML package - i.e. with the RandomForest
> classifier.
> There is PMML support in the MLib package afaik but not in ML - is that
> correct?
> How do you approach this, so that you do not have to fit your model before
> every prediction job?
> Thanks,
> Sebastian
> Sebastian Küpers
> Account Director
> Publicis Pixelpark
> Leibnizstrasse 65, 10629 Berlin
> T +49 30 5058 1838
> M +49 172 389 28 52
> Web:, Twitter: @pubpxp
> Facebook:
> Publicis Pixelpark - eine Marke der Pixelpark AG
> Vorstand: Horst Wagner (Vorsitzender), Dirk Kedrowitsch
> Aufsichtsratsvorsitzender: Pedro Simko
> Amtsgericht Charlottenburg: HRB 72163
> Disclaimer The information in this email and any attachments may contain
> proprietary and confidential information that is intended for the
> addressee(s) only. If you are not the intended recipient, you are hereby
> notified that any disclosure, copying, distribution, retention or use of
> the contents of this information is prohibited. When addressed to our
> clients or vendors, any information contained in this e-mail or any
> attachments is subject to the terms and conditions in any governing
> contract. If you have received this e-mail in error, please immediately
> contact the sender and delete the e-mail.

Re: How to subtract two RDDs with same size

2015-09-23 Thread Sujit Pal
Hi Zhiliang,

How about doing something like this?

val rdd3 = => => z._1 - z._2))

The first zip will join the two RDDs and produce an RDD of (Array[Float],
Array[Float]) pairs. On each pair, we zip the two Array[Float] components
together to form an Array[(Float, Float)] and then we subtract the first
element from the second in the inner map (the inner map is a Scala map not
a Spark one).

I tried this out on a notebook:

val rdd1 = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0),
Array(7.0, 8.0, 9.0)))
val rdd2 = sc.parallelize(List(Array(1.0, 4.0, 3.0), Array(4.0, 10.0, 6.0),
Array(7.0, 16.0, 9.0)))
val rdd3 = => => z._1 - z._2))

gives me:
res0: Array[Array[Double]] = Array(Array(0.0, -2.0, 0.0), Array(0.0, -5.0,
0.0), Array(0.0, -8.0, 0.0))


On Wed, Sep 23, 2015 at 12:23 AM, Zhiliang Zhu  wrote:

> there is matrix add API, might map rdd2 each row element to be negative ,
> then make rdd1 and rdd2 and call add ?
> Or some more ways ...
> On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu <
>> wrote:
> Hi All,
> There are two RDDs :  RDD rdd1, and RDD rdd2,
> that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with
> same row number and column number.
> I would like to get RDD rdd3,  each element in rdd3 is the
> subtract between rdd1 and rdd2 of the
> same position, which is similar Matrix subtract:
> rdd3 = rdd1 - rdd2 ...
> It seemed very difficult to operate this kinds of matrix  arithmetic, even
> is about add, subtract, multiple , diff etc...
> I shall  appreciate your help very much~~
> Zhiliang

Re: Calling a method parallel

2015-09-23 Thread Sujit Pal
Hi Tapan,

Perhaps this may work? It takes a range of 0..100 and creates an RDD out of
them, then calls X(i) on each. The X(i) should be executed on the workers
in parallel.

val results = sc.parallelize(0 until 100).map(idx => X(idx))

results = sc.parallelize(range(100)).map(lambda idx: X(idx))


On Wed, Sep 23, 2015 at 6:46 AM, Tapan Sharma 

> Hi All,
> I want to call a method X(int i) from my Spark program for different values
> of i.
> This means.
> X(1), X(2).. X(n)..
> Each time it returns the one object.
> Currently I am doing this sequentially.
> Is there any way to run these in parallel and I get back the list of
> objects?
> Sorry for this basic question.
> Regards
> Tapan
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Would something like this work?

val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))


On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu 

> Hi Romi,
> Thanks very much for your kind help comment~~
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
> I must show all my sincere thanks torwards your kind help.
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
> Best Regards,
> Zhiliang
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman 
> wrote:
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
> *Romi Kuntsman*, *Big Data Engineer*
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  > wrote:
> Dear ,
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
> Thanks very much!
> John

Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Haven't used the Java API but found this Javadoc page, may be helpful to

I think the equivalent Java code snippet might go something like this:

RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)

(the second parameter of fromRDD comes from this discussion thread).

There is also the SlidingRDD decorator:

So maybe something like this:

new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))


On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu <> wrote:

> Hi Sujit,
> I must appreciate your kind help very much~
> It seems to be OK, however, do you know the corresponding spark Java API
> achievement...
> Is there any java API as scala sliding, and it seemed that I do not find
> spark scala's doc about sliding ...
> Thank you very much~
> Zhiliang
> On Monday, September 21, 2015 11:48 PM, Sujit Pal <>
> wrote:
> Hi Zhiliang,
> Would something like this work?
> val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
> -sujit
> On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu <
> > wrote:
> Hi Romi,
> Thanks very much for your kind help comment~~
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
> I must show all my sincere thanks torwards your kind help.
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
> Best Regards,
> Zhiliang
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman <>
> wrote:
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
> *Romi Kuntsman*, *Big Data Engineer*
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu <
> > wrote:
> Dear ,
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
> Thanks very much!
> John

Re: Scala: How to match a java object????

2015-08-18 Thread Sujit Pal
Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal =
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

to get your Seq of Doubles. You will need the JavaConversions._ import to
allow Java Doubles to be treated by Scala as Scala Doubles.


On Tue, Aug 18, 2015 at 12:59 PM, wrote:

 Hi, thank you for further assistance

 you can reproduce this by simply running

 *5 match { case java.math.BigDecimal = 2 }*

 In my personal case, I am applying a map acton to a Seq[Any], so the
 elements inside are of type any, to which I need to apply a proper


 *From:* William Briggs []
 *Sent:* Tuesday, August 18, 2015 4:46 PM
 *To:* Ellafi, Saif A.;
 *Subject:* Re: Scala: How to match a java object

 Could you share your pattern matching expression that is failing?

 On Tue, Aug 18, 2015, 3:38 PM wrote:

 Hi all,

 I am trying to run a spark job, in which I receive *java.math.BigDecimal 
 instead of the scala equivalents, and I am trying to convert them into

 If I try to match-case this object class, I get: *“error: object
 java.math.BigDecimal is not a value”*

 How could I get around matching java objects? I would like to avoid a
 multiple try-catch on ClassCastExceptions for all my checks.

 Thank you,


Re: How to increase parallelism of a Spark cluster?

2015-08-03 Thread Sujit Pal
@Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
query string in the partition, sends the query to Solr using SolrJ, and
gets back the top N results. It then reformats the result data into one
long string and returns the key value pair as (query string, result string).

@Igor: Thanks for the parameter suggestions. I will check the
--num-executors and if there is a way to set the number of cores/executor
with my Databricks admin and update here if I find it, but from the
Databricks console, it appears that the number of executors per box is 1.
This seems normal though, per the diagram on this page:

where it seems that there is 1 executor per box, and each executor can
spawn multiple threads to take care of multiple tasks (see bullet #1 copied

 Each application gets its own executor processes, which stay up for the
 duration of the whole application and run tasks in multiple threads. This
 has the benefit of isolating applications from each other, on both the
 scheduling side (each driver schedules its own tasks) and executor side
 (tasks from different applications run in different JVMs).

Regarding hitting the max number of requests, thanks for the link. I am
using the default client. Just peeked at the Solr code, and the default
settings (if no HttpClient instance is supplied in the ctor) is to use
DefaultHttpClient (from HttpComponents) whose settings are as follows:

- Version: HttpVersion.HTTP_1_1


- NoTcpDelay: true

- SocketBufferSize: 8192

- UserAgent: Apache-HttpClient/release (java 1.5)

 In addition, the Solr code sets the following additional config parameters
on the DefaultHttpClient.

  params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
   params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
   params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);

Since all my connections are coming out of 2 worker boxes, it looks like I
could get 32x2 = 64 clients hitting Solr, right?

@Steve: Thanks for the link to the HttpClient config. I was thinking about
using a thread pool (or better using a PoolingHttpClientManager per the
docs), but it probably won't help since its still being fed one request at
a time.
@Abhishek: my observations agree with what you said. In the past I have had
success with repartition to reduce the partition size especially when
groupBy operations were involved. But I believe an executor should be able
to handle multiple tasks in parallel from what I understand about Akka on
which Spark is built - the worker is essentially an ActorSystem which can
contain multiple Actors, each actor works on a queue of tasks. Within an
Actor everything is sequential, but the ActorSystem is responsible for
farming out tasks it gets to each of its Actors. Although it is possible I
could be generalizing incorrectly from my limited experience with Akka.

Thanks again for all your help. Please let me know if something jumps out
and/or if there is some configuration I should check.


On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh wrote:

 I don't know if (your assertion/expectation that) workers will process
 things (multiple partitions) in parallel is really valid. Or if having more
 partitions than workers will necessarily help (unless you are memory bound
 - so partitions is essentially helping your work size rather than execution

 [Disclaimer: I am no authority on Spark, but wanted to throw my spin based
 my own understanding].

 Nothing official about it :)


 On Jul 31, 2015, at 1:03 PM, Sujit Pal wrote:


 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr) = (keyValue._1, process(solr, keyValue)))

  .mapPartitions(keyValues = getResults(keyValues))

 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me

Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Sujit Pal
No one has any ideas?

Is there some more information I should provide?

I am looking for ways to increase the parallelism among workers. Currently
I just see number of simultaneous connections to Solr equal to the number
of workers. My number of partitions is (2.5x) larger than number of
workers, and the workers seem to be large enough to handle more than one
task at a time.

I am creating a single client per partition in my mapPartition call. Not
sure if that is creating the gating situation? Perhaps I should use a Pool
of clients instead?

Would really appreciate some pointers.

Thanks in advance for any help you can provide.


On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal wrote:


 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr) = (keyValue._1, process(solr, keyValue)))

  .mapPartitions(keyValues = getResults(keyValues))

 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.


Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Sujit Pal
Hi Igor,

The cluster is a Databricks Spark cluster. It consists of 1 master + 4
workers, each worker has 60GB RAM and 4 CPUs. The original mail has some
more details (also the reference to the HttpSolrClient in there should be
HttpSolrServer, sorry about that, mistake while writing the email).

There is no additional configuration on the external Solr host from my
code, I am using the default HttpClient provided by HttpSolrServer.
According to the Javadocs, you can pass in a HttpClient object as well. Is
there some specific configuration you would suggest to get past any limits?

On another project, I faced a similar problem but I had more leeway (was
using a Spark cluster from EC2) and less time, my workaround was to use
python multiprocessing to create a program that started up 30 python
JSON/HTTP clients and wrote output into 30 output files, which were then
processed by Spark. Reason I mention this is that I was using default
configurations there as well, just needed to increase the number of
connections against Solr to a higher number.

This time round, I would like to do this through Spark because it makes the
pipeline less complex.


On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman wrote:

 What kind of cluster? How many cores on each worker? Is there config for
 http solr client? I remember standard httpclient has limit per route/host.
 On Aug 2, 2015 8:17 PM, Sujit Pal wrote:

 No one has any ideas?

 Is there some more information I should provide?

 I am looking for ways to increase the parallelism among workers.
 Currently I just see number of simultaneous connections to Solr equal to
 the number of workers. My number of partitions is (2.5x) larger than number
 of workers, and the workers seem to be large enough to handle more than one
 task at a time.

 I am creating a single client per partition in my mapPartition call. Not
 sure if that is creating the gating situation? Perhaps I should use a Pool
 of clients instead?

 Would really appreciate some pointers.

 Thanks in advance for any help you can provide.


 On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal


 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr) = (keyValue._1, process(solr, keyValue)))

  .mapPartitions(keyValues = getResults(keyValues))

 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.


How to increase parallelism of a Spark cluster?

2015-07-31 Thread Sujit Pal

I am trying to run a Spark job that hits an external webservice to get back
some information. The cluster is 1 master + 4 workers, each worker has 60GB
RAM and 4 CPUs. The external webservice is a standalone Solr server, and is
accessed using code similar to that shown below.

def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr) = (keyValue._1, process(solr, keyValue)))

 .mapPartitions(keyValues = getResults(keyValues))

The mapPartitions does some initialization to the SolrJ client per
partition and then hits it for each record in the partition via the
getResults() call.

I repartitioned in the hope that this will result in 10 clients hitting
Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
clients if I can). However, I counted the number of open connections using
netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
observed that Solr has a constant 4 clients (ie, equal to the number of
workers) over the lifetime of the run.

My observation leads me to believe that each worker processes a single
stream of work sequentially. However, from what I understand about how
Spark works, each worker should be able to process number of tasks
parallelly, and that repartition() is a hint for it to do so.

Is there some SparkConf environment variable I should set to increase
parallelism in these workers, or should I just configure a cluster with
multiple workers per machine? Or is there something I am doing wrong?

Thank you in advance for any pointers you can provide.


Re: use S3-Compatible Storage with spark

2015-07-17 Thread Sujit Pal
Hi Schmirr,

The part after the s3n:// is your bucket name and folder name, ie
s3n://${bucket_name}/${folder_name}[/${subfolder_name}]*. Bucket names are
unique across S3, so the resulting path is also unique. There is no concept
of hostname in s3 urls as far as I know.


On Fri, Jul 17, 2015 at 1:36 AM, Schmirr Wurst


 I wonder how to use S3 compatible Storage in Spark ?
 If I'm using s3n:// url schema, the it will point to amazon, is there
 a way I can specify the host somewhere ?

 To unsubscribe, e-mail:
 For additional commands, e-mail:

Re: Spark on EMR with S3 example (Python)

2015-07-15 Thread Sujit Pal
Hi Roberto,

I think you would need to as Akhil said. Just checked from this page:

and clicking through to a few dataset links, all of them are available on
s3 (some are available via http and ftp, but I think the point of these
datasets are that they are usually very large so having it on s3 ensures
that its easier to take your code to it than bring the datasets to your


On Tue, Jul 14, 2015 at 1:56 PM, Pagliari, Roberto

 Hi Sujit,

 I just wanted to access public datasets on Amazon. Do I still need to
 provide the keys?

 Thank you,

 *From:* Sujit Pal []
 *Sent:* Tuesday, July 14, 2015 3:14 PM
 *To:* Pagliari, Roberto
 *Subject:* Re: Spark on EMR with S3 example (Python)

 Hi Roberto,

 I have written PySpark code that reads from private S3 buckets, it should
 be similar for public S3 buckets as well. You need to set the AWS access
 and secret keys into the SparkContext, then you can access the S3 folders
 and files with their s3n:// paths. Something like this:

 sc = SparkContext()

 sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, aws_access_key)


 mydata = sc.textFile(s3n://mybucket/my_input_folder) \

 .map(lambda x: do_something(x)) \



 You can read and write sequence files as well - these are the only 2
 formats I have tried, but I'm sure the other ones like JSON would work
 also. Another approach is to embed the AWS access key and secret key into
 the s3n:// path.

 I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its
 an older version but not sure) but it works for access.

 Hope this helps,


 On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto wrote:

 Is there an example about how to load data from a public S3 bucket in
 Python? I haven’t found any.

 Thank you,

Re: Efficiency of leftOuterJoin a cassandra rdd

2015-07-15 Thread Sujit Pal
Hi Wush,

One option may be to try a replicated join. Since your rdd1 is small, read
it into a collection and broadcast it to the workers, then filter your
larger rdd2 against the collection on the workers.


On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain wrote:

 Leftouterjoin and join apis are super slow in spark. 100x slower than

 Sent from my iPhone

  On 14-Jul-2015, at 10:59 PM, Wush Wu wrote:
  I don't understand.
  By the way, the `joinWithCassandraTable` does improve my query time
  from 40 mins to 3 mins.
  2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏)
  I have explored spark joins for last few months (you can search my
  and its frustrating useless.
  On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu wrote:
  Dear all,
  I have found a post discussing the same thing:!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
  The solution is using joinWithCassandraTable and the documentation
  is here:
  2015-07-15 12:15 GMT+08:00 Wush Wu
  Dear all,
  I am trying to join two RDDs, named rdd1 and rdd2.
  rdd1 is loaded from a textfile with about 33000 records.
  rdd2 is loaded from a table in cassandra which has about 3 billions
  I tried the following code:
  val rdd1 : (String, XXX) = sc.textFile(...).map(...)
  import org.apache.spark.sql.cassandra.CassandraSQLContext
  val rdd2 : (String, String) = cc.sql(SELECT x, y FROM xxx).map(r =
  val result = rdd1.leftOuterJoin(rdd2)
  However, the log shows that the spark loaded 3 billions records from
  cassandra and only 33000 records left at the end.
  Is there a way to query the cassandra based on the key in rdd1?
  Here is some information of our system:
  - The spark version is 1.3.1
  - The cassandra version is 2.0.14
  - The key of joining is the primary key of the cassandra table.
  To unsubscribe, e-mail:
  For additional commands, e-mail:

 To unsubscribe, e-mail:
 For additional commands, e-mail:

Re: Spark on EMR with S3 example (Python)

2015-07-14 Thread Sujit Pal
Hi Roberto,

I have written PySpark code that reads from private S3 buckets, it should
be similar for public S3 buckets as well. You need to set the AWS access
and secret keys into the SparkContext, then you can access the S3 folders
and files with their s3n:// paths. Something like this:

sc = SparkContext()
sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, aws_access_key)

mydata = sc.textFile(s3n://mybucket/my_input_folder) \
.map(lambda x: do_something(x)) \

You can read and write sequence files as well - these are the only 2
formats I have tried, but I'm sure the other ones like JSON would work
also. Another approach is to embed the AWS access key and secret key into
the s3n:// path.

I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its
an older version but not sure) but it works for access.

Hope this helps,

On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto

 Is there an example about how to load data from a public S3 bucket in
 Python? I haven’t found any.

 Thank you,

Re: PySpark without PySpark

2015-07-10 Thread Sujit Pal
Hi Ashish,

Cool. glad it worked out. I have only used Spark clusters on EC2, which I
spin up using the spark-ec2 scripts (part of the Spark downloads). So don't
have any experience setting up inhouse clusters like you want to do. But I
found some documentation here that may be helpful.

There are other options as well in this document that will require you to
know some other tools like Chef (previous sections).

Good luck,

On Thu, Jul 9, 2015 at 10:25 PM, Ashish Dutt wrote:

 Hi Sujit,
 Thank you for your time to help me out. And special thank you for your
 elaborate steps.
 I corrected SPARK_HOME to be c:\spark-1.3.0
 2) I installed py4j from anaconda command prompt and the command you gave
 executed successfully.
 3) I replaced python27 as python in the 00-setup script.
 I now give the Path variables as defined and the PATH.

 JAVA_HOME   C:\Program Files\Java\jdk1.7.0_79
 PYTHONPATH C:\Users\Ashish Dutt\Anaconda

 4) This time, I grabbed my baseball bat (you do know why..) invoked
 ipython notebook again and with the other free hand I slowly typed the
 print SPARK_HOME -- it worked Then another command from pyspark import
 SparkContext and it worked too!!!
 The baseball bat dropped to the ground and I quickly jabbed the other
 commands given in the post. Attached is the screenshot and it all worked...

 Sujit, a quintal of thanks for your persistence in helping me resolve this
 problem. You have been very helpful and I wish you luck and success in all
 your endeavors.
 Next milestone is to get this to work in a cluster environment.

 I am confused that do I need to install spark-1.3.0 on all the 4 linux
 machines that make my cluster?
 The goal is to use my laptop as a client (from where I will submit spark
 commands to the master server) The master can then distribute the job to
 the three nodes and provide the client with the end result.
 Am i correct in this visualization ?

 Once again, thank you for your efforts.

 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Fri, Jul 10, 2015 at 11:48 AM, Sujit Pal

 Hi Ashish,

 Julian's approach is probably better, but few observations:

 1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin).

 2) If you have anaconda python installed (I saw that you had set this up
 in a separate thread, py4j should be part of the package - at least I think
 so. To test this, try in your python repl:
  from py4j.java_gateway import JavaGateway
 if it succeeds you already have it.

 3) In case Py4J is not installed, the best way to install a new package
 is using easy_install or pip. Make sure your path is set up so when you
 call python you are calling the anaconda version (in case you have multiple
 python versions), then if so, do easy_install py4j - this will install
 py4j correctly without any messing around on your part. Install
 instructions for py4j available on their site:

 4) You should replace the python2 in your 00-setup-script with
 python, so you point to the $SPARK_HOME/python directory


 On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt

 Hello Sujit,
 Many thanks for your response.
 To answer your questions;
 Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It
 is SPARK_HOME=C:/spark-1.3.0/bin
 Q2) Is there a python2 or python subdirectory under the root of your
 Spark installation? - Yes, i do have that too. It is called python. To fix
 this problem this is what I did,
 I downloaded py4j- from here which was not there initially when
 I downloaded the spark package from the official repository. I then put it
 in the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract
 the zip file. I put it in as it is.
 The pyspark folder of the spark-1.3.0 root folder. What I next did was
 copy this file and put it in the  pythonpath. So my python path now reads
 as PYTHONPATH=C:/Python27/

 I then rebooted the computer and a silent prayer :-) Then I opened the
 command prompt and invoked the command pyspark from the bin directory of
 spark and EUREKA, it worked :-)  Attached is the screenshot for the same.
 Now, the problem is with IPython notebook. I cannot get it to work with
 I have a cluster with 4 nodes using CDH5.4

 I was able to resolve the problem. Now the next challenge was to
 configure it with IPython. Followed the steps as documented

Re: PySpark without PySpark

2015-07-09 Thread Sujit Pal
Hi Ashish,

Your 00-pyspark-setup file looks very different from mine (and from the one
described in the blog post). Questions:

1) Do you have SPARK_HOME set up in your environment? Because if not, it
sets it to None in your code. You should provide the path to your Spark
installation. In my case I have spark-1.3.1 installed under $HOME/Software
and the code block under # Configure the environment (or yellow highlight
in the code below) reflects that.
2) Is there a python2 or python subdirectory under the root of your Spark
installation? In my case its python not python2. This contains the
Python bindings for spark, so the block under # Add the PySpark/py4j to
the Python path (or green highlight in the code below) adds it to the
Python sys.path so things like pyspark.SparkContext are accessible in your
Python environment.

import os
import sys

# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, python, build))
sys.path.insert(0, os.path.join(SPARK_HOME, python))

Hope this fixes things for you.


On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt wrote:

 Hi Sujit,
 Thanks for your response.

 So i opened a new notebook using the command ipython notebook --profile
 spark and tried the sequence of commands. i am getting errors. Attached is
 the screenshot of the same.
 Also I am attaching the for your reference. Looks
 like, I have written something wrong here. Cannot seem to figure out, what
 is it?

 Thank you for your help

 Ashish Dutt

 On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal wrote:

 Hi Ashish,

  Nice post.
 Agreed, kudos to the author of the post, Benjamin Benfort of District

  Following your post, I get this problem;
 Again, not my post.

 I did try setting up IPython with the Spark profile for the edX Intro to
 Spark course (because I didn't want to use the Vagrant container) and it
 worked flawlessly with the instructions provided (on OSX). I haven't used
 the IPython/PySpark environment beyond very basic tasks since then though,
 because my employer has a Databricks license which we were already using
 for other stuff and we ended up doing the labs on Databricks.

 Looking at your screenshot though, I don't see why you think its picking
 up the default profile. One simple way of checking to see if things are
 working is to open a new notebook and try this sequence of commands:

 from pyspark import SparkContext
 sc = SparkContext(local, pyspark)

 You should see something like this after a little while:
 pyspark.context.SparkContext at 0x1093c9b10

 While the context is being instantiated, you should also see lots of log
 lines scroll by on the terminal where you started the ipython notebook
 --profile spark command - these log lines are from Spark.

 Hope this helps,

 On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt

 Hi Sujit,
 Nice post.. Exactly what I had been looking for.
 I am relatively a beginner with Spark and real time data processing.
 We have a server with CDH5.4 with 4 nodes. The spark version in our
 server is 1.3.0
 On my laptop I have spark 1.3.0 too and its using Windows 7 environment.
 As per point 5 of your post I am able to invoke pyspark locally as in a
 standalone mode.

 Following your post, I get this problem;

 1. In section Using Ipython notebook with spark I cannot understand
 why it is picking up the default profile and not the pyspark profile. I am
 sure it is because of the path variables. Attached is the screenshot. Can
 you suggest how to solve this.

 Current the path variables for my laptop are like

 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal

 You are welcome Davies. Just to clarify, I didn't write the post (not
 sure if my earlier post gave that impression, apologize if so), although I
 agree its great :-).


 On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu

 Great post, thanks for sharing with us!

 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal
  Hi Julian,
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark
 cluster on
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
  for). Can't share the code, but the basic approach is covered in
 this blog

Re: PySpark without PySpark

2015-07-09 Thread Sujit Pal
Hi Ashish,

Julian's approach is probably better, but few observations:

1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin).

2) If you have anaconda python installed (I saw that you had set this up in
a separate thread, py4j should be part of the package - at least I think
so. To test this, try in your python repl:
 from py4j.java_gateway import JavaGateway
if it succeeds you already have it.

3) In case Py4J is not installed, the best way to install a new package is
using easy_install or pip. Make sure your path is set up so when you call
python you are calling the anaconda version (in case you have multiple
python versions), then if so, do easy_install py4j - this will install
py4j correctly without any messing around on your part. Install
instructions for py4j available on their site:

4) You should replace the python2 in your 00-setup-script with python,
so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python).


On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt wrote:

 Hello Sujit,
 Many thanks for your response.
 To answer your questions;
 Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is
 Q2) Is there a python2 or python subdirectory under the root of your
 Spark installation? - Yes, i do have that too. It is called python. To fix
 this problem this is what I did,
 I downloaded py4j- from here which was not there initially when I
 downloaded the spark package from the official repository. I then put it in
 the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the
 zip file. I put it in as it is.
 The pyspark folder of the spark-1.3.0 root folder. What I next did was
 copy this file and put it in the  pythonpath. So my python path now reads
 as PYTHONPATH=C:/Python27/

 I then rebooted the computer and a silent prayer :-) Then I opened the
 command prompt and invoked the command pyspark from the bin directory of
 spark and EUREKA, it worked :-)  Attached is the screenshot for the same.
 Now, the problem is with IPython notebook. I cannot get it to work with
 I have a cluster with 4 nodes using CDH5.4

 I was able to resolve the problem. Now the next challenge was to configure
 it with IPython. Followed the steps as documented in the blog. And I get
 the errors, attached is the screenshot

 @Julian, I tried your method too. Attached is the screenshot of the error
 message 7.png

 Hope you can help me out to fix this problem.
 Thank you for your time.

 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Fri, Jul 10, 2015 at 12:02 AM, Sujit Pal

 Hi Ashish,

 Your 00-pyspark-setup file looks very different from mine (and from the
 one described in the blog post). Questions:

 1) Do you have SPARK_HOME set up in your environment? Because if not, it
 sets it to None in your code. You should provide the path to your Spark
 installation. In my case I have spark-1.3.1 installed under $HOME/Software
 and the code block under # Configure the environment (or yellow highlight
 in the code below) reflects that.
 2) Is there a python2 or python subdirectory under the root of your Spark
 installation? In my case its python not python2. This contains the
 Python bindings for spark, so the block under # Add the PySpark/py4j to
 the Python path (or green highlight in the code below) adds it to the
 Python sys.path so things like pyspark.SparkContext are accessible in your
 Python environment.

 import os
 import sys

 # Configure the environment
 if 'SPARK_HOME' not in os.environ:
 os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1

 # Create a variable for our root path
 SPARK_HOME = os.environ['SPARK_HOME']

 # Add the PySpark/py4j to the Python Path
 sys.path.insert(0, os.path.join(SPARK_HOME, python, build))
 sys.path.insert(0, os.path.join(SPARK_HOME, python))

 Hope this fixes things for you.


 On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt

 Hi Sujit,
 Thanks for your response.

 So i opened a new notebook using the command ipython notebook --profile
 spark and tried the sequence of commands. i am getting errors. Attached is
 the screenshot of the same.
 Also I am attaching the for your reference. Looks
 like, I have written something wrong here. Cannot seem to figure out, what
 is it?

 Thank you for your help

 Ashish Dutt

 On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal

 Hi Ashish,

  Nice post.
 Agreed, kudos to the author of the post, Benjamin Benfort of District

  Following your post, I get this problem;
 Again, not my post.

 I did try setting up IPython with the Spark profile for the edX Intro
 to Spark course (because I didn't

Re: PySpark without PySpark

2015-07-08 Thread Sujit Pal
Hi Julian,

I recently built a Python+Spark application to do search relevance
analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on
EC2 (so I don't use the PySpark shell, hopefully thats what you are looking
for). Can't share the code, but the basic approach is covered in this blog
post - scroll down to the section Writing a Spark Application.

Hope this helps,


On Wed, Jul 8, 2015 at 7:46 AM, Julian wrote:


 Is there a resource that has written up what the necessary steps are for
 running PySpark without using the PySpark shell?

 I can reverse engineer (by following the tracebacks and reading the shell
 source) what the relevant Java imports needed are, but I would assume
 someone has attempted this before and just published something I can either
 follow or install? If not, I have something that pretty much works and can
 publish it, but I'm not a heavy Spark user, so there may be some things
 left out that I haven't hit because of how little of pyspark I'm playing


 View this message in context:
 Sent from the Apache Spark User List mailing list archive at

 To unsubscribe, e-mail:
 For additional commands, e-mail:

Re: PySpark without PySpark

2015-07-08 Thread Sujit Pal
Hi Ashish,

 Nice post.
Agreed, kudos to the author of the post, Benjamin Benfort of District Labs.

 Following your post, I get this problem;
Again, not my post.

I did try setting up IPython with the Spark profile for the edX Intro to
Spark course (because I didn't want to use the Vagrant container) and it
worked flawlessly with the instructions provided (on OSX). I haven't used
the IPython/PySpark environment beyond very basic tasks since then though,
because my employer has a Databricks license which we were already using
for other stuff and we ended up doing the labs on Databricks.

Looking at your screenshot though, I don't see why you think its picking up
the default profile. One simple way of checking to see if things are
working is to open a new notebook and try this sequence of commands:

from pyspark import SparkContext
sc = SparkContext(local, pyspark)

You should see something like this after a little while:
pyspark.context.SparkContext at 0x1093c9b10

While the context is being instantiated, you should also see lots of log
lines scroll by on the terminal where you started the ipython notebook
--profile spark command - these log lines are from Spark.

Hope this helps,

On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt wrote:

 Hi Sujit,
 Nice post.. Exactly what I had been looking for.
 I am relatively a beginner with Spark and real time data processing.
 We have a server with CDH5.4 with 4 nodes. The spark version in our server
 is 1.3.0
 On my laptop I have spark 1.3.0 too and its using Windows 7 environment.
 As per point 5 of your post I am able to invoke pyspark locally as in a
 standalone mode.

 Following your post, I get this problem;

 1. In section Using Ipython notebook with spark I cannot understand why
 it is picking up the default profile and not the pyspark profile. I am sure
 it is because of the path variables. Attached is the screenshot. Can you
 suggest how to solve this.

 Current the path variables for my laptop are like

 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal wrote:

 You are welcome Davies. Just to clarify, I didn't write the post (not
 sure if my earlier post gave that impression, apologize if so), although I
 agree its great :-).


 On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu

 Great post, thanks for sharing with us!

 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal
  Hi Julian,
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark
 cluster on
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
  for). Can't share the code, but the basic approach is covered in this
  post - scroll down to the section Writing a Spark Application.
  Hope this helps,
  On Wed, Jul 8, 2015 at 7:46 AM, Julian
  Is there a resource that has written up what the necessary steps are
  running PySpark without using the PySpark shell?
  I can reverse engineer (by following the tracebacks and reading the
  source) what the relevant Java imports needed are, but I would assume
  someone has attempted this before and just published something I can
  follow or install? If not, I have something that pretty much works
 and can
  publish it, but I'm not a heavy Spark user, so there may be some
  left out that I haven't hit because of how little of pyspark I'm
  View this message in context:
  Sent from the Apache Spark User List mailing list archive at
  To unsubscribe, e-mail:
  For additional commands, e-mail:

Re: PySpark without PySpark

2015-07-08 Thread Sujit Pal
You are welcome Davies. Just to clarify, I didn't write the post (not sure
if my earlier post gave that impression, apologize if so), although I agree
its great :-).


On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu wrote:

 Great post, thanks for sharing with us!

 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal wrote:
  Hi Julian,
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark cluster
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
  for). Can't share the code, but the basic approach is covered in this
  post - scroll down to the section Writing a Spark Application.
  Hope this helps,
  On Wed, Jul 8, 2015 at 7:46 AM, Julian
  Is there a resource that has written up what the necessary steps are for
  running PySpark without using the PySpark shell?
  I can reverse engineer (by following the tracebacks and reading the
  source) what the relevant Java imports needed are, but I would assume
  someone has attempted this before and just published something I can
  follow or install? If not, I have something that pretty much works and
  publish it, but I'm not a heavy Spark user, so there may be some things
  left out that I haven't hit because of how little of pyspark I'm playing
  View this message in context:
  Sent from the Apache Spark User List mailing list archive at
  To unsubscribe, e-mail:
  For additional commands, e-mail:

Re: HOw to concatenate two csv files into one RDD?

2015-06-26 Thread Sujit Pal
Hi Rex,

If the CSV files are in the same folder and there are no other files,
specifying the directory to sc.textFiles() (or equivalent) will pull in all
the files. If there are other files, you can pass in a pattern that would
capture the two files you care about (if thats possible). If neither of
these work for you, you can create individual RDDs for each file and union


On Fri, Jun 26, 2015 at 11:00 AM, Rex X wrote:

 With Python Pandas, it is easy to do concatenation of dataframes
 by combining  pandas.concat
 and pandas.read_csv

 pd.concat([pd.read_csv(os.path.join(Path_to_csv_files, f)) for f in

 where csvfiles is the list of csv files

 HOw can we do this in Spark?

Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?

2015-06-16 Thread Sujit Pal
Hi Rexx,

In general (ie not Spark specific), its best to convert categorical data to
1-hot encoding rather than integers - that way the algorithm doesn't use
the ordering implicit in the integer representation.


On Tue, Jun 16, 2015 at 1:17 PM, Rex X wrote:

 Is it necessary to convert categorical data into integers?

 Any tips would be greatly appreciated!


 On Sun, Jun 14, 2015 at 10:05 AM, Rex X wrote:

 For clustering analysis, we need a way to measure distances.

 When the data contains different levels of measurement -
 *binary / categorical (nominal), counts (ordinal), and ratio (scale)*

 To be concrete, for example, working with attributes of
 *city, zip, satisfaction_level, price*

 In the meanwhile, the real data usually also contains string attributes,
 for example, book titles. The distance between two strings can be measured
 by minimum-edit-distance.

 In SPSS, it provides Two-Step Cluster, which can handle both ratio scale
 and ordinal numbers.

 What is right algorithm to do hierarchical clustering analysis with all
 these four-kind attributes above with *MLlib*?

 If we cannot find a right metric to measure the distance, an alternative
 solution is to do a topological data analysis (e.g. linkage, and etc).
 Can we do such kind of analysis with *GraphX*?


Re: Access several s3 buckets, with credentials containing /

2015-06-06 Thread Sujit Pal
Hi Pierre,

One way is to recreate your credentials until AWS generates one without a
slash character in it. Another way I've been using is to pass these
credentials outside the S3 file path by setting the following (where sc is
the SparkContext).

sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, ACCESS_KEY)


After that you can define the RDDs more simply:

val c1 = sc.textFile(s3n://bucket1/file.csv)


On Fri, Jun 5, 2015 at 3:55 AM, Steve Loughran

  On 5 Jun 2015, at 08:03, Pierre B wrote:
  Hi list!
  My problem is quite simple.
  I need to access several S3 buckets, using different credentials.:
  val c1 =
  val c2 =
  val c3 =
  One/several of those AWS credentials might contain / in the private
  This is a known problem and from my research, the only ways to deal with
  these / are:
  1/ use environment variables to set the AWS credentials, then access the
  buckets without specifying the credentials
  2/ set the hadoop configuration to contain the the credentials.
  However, none of these solutions allow me to access different buckets,
  different credentials.
  Can anyone help me on this?

 long known outstanding bug in Hadoop s3n, nobody has ever sat down to fix.
 One subtlety is its really hard to test -as you need credentials with a /

 The general best practise is recreate your credentials

 Now, if you can get the patch to work against hadoop trunk, I promise I
 will commit it

 To unsubscribe, e-mail:
 For additional commands, e-mail:

Not able to run SparkPi locally

2015-05-23 Thread Sujit Pal
Hello all,

This is probably me doing something obviously wrong, would really
appreciate some pointers on how to fix this.

I installed spark-1.3.1-bin-hadoop2.6.tgz from the Spark download page [] and just untarred it on a local
drive. I am on Mac OSX 10.9.5 and the JDK is 1.8.0_40.

I ran the following commands (the first 3 run succesfully, I mention it
here to rule out any possibility of it being an obviously bad install).

1) laptop$ bin/spark-shell

scala sc.parallelize(1 to 100).count()

res0: Long = 100

scala exit

2) laptop$ bin/pyspark




3) laptop$ bin/spark-submit examples/src/main/python/

Pi is roughly 3.142800

4) laptop$ bin/run-example SparkPi

This hangs at this line (full stack trace is provided at the end of this

15/05/23 07:52:10 INFO Executor: Fetching with
timestamp 1432392670140

15/05/23 07:52:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) connect timed out


and finally dies with this message:

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost): connect timed out

I checked with ifconfig -a on my box, is my IP address on my local


ether 34:36:3b:d2:b0:f4

inet netmask 0xff00 broadcast

media: autoselect

 status: active

I think perhaps there may be some configuration I am missing. Being able to
run jobs locally (without HDFS or creating a cluster) is essential for
development, and the examples come from the Spark 1.3.1 Quick Start page [], so this is probably
something to do with my environment.

Thanks in advance for any help you can provide.



Full output of SparkPi run (including stack trace) follows:

Using Spark's default log4j profile:

15/05/23 08:08:55 INFO SparkContext: Running Spark version 1.3.1

15/05/23 08:08:57 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

15/05/23 08:08:57 INFO SecurityManager: Changing view acls to: palsujit

15/05/23 08:08:57 INFO SecurityManager: Changing modify acls to: palsujit

15/05/23 08:08:57 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(palsujit);
users with modify permissions: Set(palsujit)

15/05/23 08:08:57 INFO Slf4jLogger: Slf4jLogger started

15/05/23 08:08:57 INFO Remoting: Starting remoting

15/05/23 08:08:58 INFO Remoting: Remoting started; listening on addresses

15/05/23 08:08:58 INFO Utils: Successfully started service 'sparkDriver' on
port 52008.

15/05/23 08:08:58 INFO SparkEnv: Registering MapOutputTracker

15/05/23 08:08:58 INFO SparkEnv: Registering BlockManagerMaster

15/05/23 08:08:58 INFO DiskBlockManager: Created local directory at

15/05/23 08:08:58 INFO MemoryStore: MemoryStore started with capacity 265.1

15/05/23 08:08:58 INFO HttpFileServer: HTTP File server directory is

15/05/23 08:08:58 INFO HttpServer: Starting HTTP Server

15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

15/05/23 08:08:58 INFO AbstractConnector: Started

15/05/23 08:08:58 INFO Utils: Successfully started service 'HTTP file
server' on port 52009.

15/05/23 08:08:58 INFO SparkEnv: Registering OutputCommitCoordinator

15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

15/05/23 08:08:58 INFO AbstractConnector: Started

15/05/23 08:08:58 INFO Utils: Successfully started service 'SparkUI' on
port 4040.

15/05/23 08:08:58 INFO SparkUI: Started SparkUI at

15/05/23 08:08:58 INFO SparkContext: Added JAR
at with
timestamp 1432393738514

15/05/23 08:08:58 INFO Executor: Starting executor ID driver on host

15/05/23 08:08:58 INFO AkkaUtils: Connecting to HeartbeatReceiver:

15/05/23 08:08:58 INFO NettyBlockTransferService: Server created on 52010

15/05/23 08:08:58 INFO BlockManagerMaster: Trying to register BlockManager

15/05/23 08:08:58 INFO 

Re: Not able to run SparkPi locally

2015-05-23 Thread Sujit Pal
Replying to my own email in case someone has the same or similar issue.

On a hunch I ran this against my Linux (Ubuntu 14.04 with JDK 8) box. Not
only did bin/run-example SparkPi run without any problems, it also
provided a very helpful message in the output.

15/05/23 08:35:15 WARN Utils: Your hostname, tsunami resolves to a loopback
address:; using instead (on interface wlan0)

15/05/23 08:35:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address

So I went back to my Mac, set SPARK_LOCAL_IP= and everything runs
fine now. To make this permanent I put this in conf/


On Sat, May 23, 2015 at 8:14 AM, Sujit Pal wrote:

 Hello all,

 This is probably me doing something obviously wrong, would really
 appreciate some pointers on how to fix this.

 I installed spark-1.3.1-bin-hadoop2.6.tgz from the Spark download page [] and just untarred it on a local
 drive. I am on Mac OSX 10.9.5 and the JDK is 1.8.0_40.

 I ran the following commands (the first 3 run succesfully, I mention it
 here to rule out any possibility of it being an obviously bad install).

 1) laptop$ bin/spark-shell

 scala sc.parallelize(1 to 100).count()

 res0: Long = 100

 scala exit

 2) laptop$ bin/pyspark




 3) laptop$ bin/spark-submit examples/src/main/python/

 Pi is roughly 3.142800

 4) laptop$ bin/run-example SparkPi

 This hangs at this line (full stack trace is provided at the end of this

 15/05/23 07:52:10 INFO Executor: Fetching with
 timestamp 1432392670140

 15/05/23 07:52:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
 0) connect timed out


 and finally dies with this message:

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
 failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): connect timed out

 I checked with ifconfig -a on my box, is my IP address on my
 local network.


 ether 34:36:3b:d2:b0:f4

 inet netmask 0xff00 broadcast

 media: autoselect

  status: active

 I think perhaps there may be some configuration I am missing. Being able
 to run jobs locally (without HDFS or creating a cluster) is essential for
 development, and the examples come from the Spark 1.3.1 Quick Start page [], so this is
 probably something to do with my environment.

 Thanks in advance for any help you can provide.



 Full output of SparkPi run (including stack trace) follows:

 Using Spark's default log4j profile:

 15/05/23 08:08:55 INFO SparkContext: Running Spark version 1.3.1

 15/05/23 08:08:57 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable

 15/05/23 08:08:57 INFO SecurityManager: Changing view acls to: palsujit

 15/05/23 08:08:57 INFO SecurityManager: Changing modify acls to: palsujit

 15/05/23 08:08:57 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(palsujit);
 users with modify permissions: Set(palsujit)

 15/05/23 08:08:57 INFO Slf4jLogger: Slf4jLogger started

 15/05/23 08:08:57 INFO Remoting: Starting remoting

 15/05/23 08:08:58 INFO Remoting: Remoting started; listening on addresses

 15/05/23 08:08:58 INFO Utils: Successfully started service 'sparkDriver'
 on port 52008.

 15/05/23 08:08:58 INFO SparkEnv: Registering MapOutputTracker

 15/05/23 08:08:58 INFO SparkEnv: Registering BlockManagerMaster

 15/05/23 08:08:58 INFO DiskBlockManager: Created local directory at

 15/05/23 08:08:58 INFO MemoryStore: MemoryStore started with capacity
 265.1 MB

 15/05/23 08:08:58 INFO HttpFileServer: HTTP File server directory is

 15/05/23 08:08:58 INFO HttpServer: Starting HTTP Server

 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

 15/05/23 08:08:58 INFO AbstractConnector: Started

 15/05/23 08:08:58 INFO Utils: Successfully started service 'HTTP file
 server' on port 52009.

 15/05/23 08:08:58 INFO SparkEnv: Registering OutputCommitCoordinator

 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

 15/05/23 08:08:58 INFO AbstractConnector: Started