Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-19 Thread Sujit Pal
Hi Janardhan,

You need the classifier "models" attribute on the second entry for
stanford-corenlp to indicate that you want the models JAR, as shown below.
Right now you are importing two instances of stanford-corenlp JARs.

libraryDependencies ++= {
  val sparkVersion = "2.0.0"
  Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
"org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )
}

-sujit


On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty <janardhan...@gmail.com>
wrote:

> Hi Sujit,
>
> Tried that option but same error:
>
> java version "1.8.0_51"
>
>
> libraryDependencies ++= {
>   val sparkVersion = "2.0.0"
>   Seq(
> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>   )
> }
>
> Error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP
> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.
> apply(Lemmatizer.scala:37)
> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.
> apply(Lemmatizer.scala:33)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
> 2.apply(ScalaUDF.scala:88)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
> 2.apply(ScalaUDF.scala:87)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(
> ScalaUDF.scala:1060)
>     at org.apache.spark.sql.catalyst.expressions.Alias.eval(
> namedExpressions.scala:142)
> at org.apache.spark.sql.catalyst.expressions.
> InterpretedProjection.apply(Projection.scala:45)
> at org.apache.spark.sql.catalyst.expressions.
> InterpretedProjection.apply(Projection.scala:29)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:234)
>
>
>
> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
>
>> Hi Janardhan,
>>
>> Maybe try removing the string "test" from this line in your build.sbt?
>> IIRC, this restricts the models JAR to be called from a test.
>>
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
>> "models",
>>
>> -sujit
>>
>>
>> On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty <
>> janardhan...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to use lemmatization as a transformer and added belwo to the
>>> build.sbt
>>>
>>>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
>>> classifier "models",
>>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>>
>>>
>>> Error:
>>> *Exception in thread "main" java.lang.NoClassDefFoundError:
>>> edu/stanford/nlp/pipeline/StanfordCoreNLP*
>>>
>>> I have tried other versions of this spark package.
>>>
>>> Any help is appreciated..
>>>
>>
>>
>


Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-18 Thread Sujit Pal
Hi Janardhan,

Maybe try removing the string "test" from this line in your build.sbt?
IIRC, this restricts the models JAR to be called from a test.

"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
"models",

-sujit


On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty 
wrote:

> Hi,
>
> I am trying to use lemmatization as a transformer and added belwo to the
> build.sbt
>
>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
> "models",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>
>
> Error:
> *Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP*
>
> I have tried other versions of this spark package.
>
> Any help is appreciated..
>


Re: pyspark mappartions ()

2016-05-14 Thread Sujit Pal
I built this recently using the accepted answer on this SO page:

http://stackoverflow.com/questions/26741714/how-does-the-pyspark-mappartitions-function-work/26745371

-sujit

On Sat, May 14, 2016 at 7:00 AM, Mathieu Longtin 
wrote:

> From memory:
> def processor(iterator):
>   for item in iterator:
> newitem = do_whatever(item)
> yield newitem
>
> newdata = data.mapPartition(processor)
>
> Basically, your function takes an iterator as an argument, and must either
> be an iterator or return one.
>
> On Sat, May 14, 2016 at 12:39 AM Abi  wrote:
>
>>
>>
>> On Tue, May 10, 2016 at 2:20 PM, Abi  wrote:
>>
>>> Is there any example of this ? I want to see how you write the the
>>> iterable example
>>
>>
>> --
> Mathieu Longtin
> 1-514-803-8977
>


Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread Sujit Pal
Hi Charles,

I tried this with dummied out functions which just sum transformations of a
list of integers, maybe they could be replaced by algorithms in your case.
The idea is to call them through a "god" function that takes an additional
type parameter and delegates out to the appropriate function. Here's my
code, maybe it helps...

def f0(xs):
>   return len(xs)
> def f1(xs):
>   return sum(xs)
> def f2(xs):
>   return sum([x**2 for x in xs])
> def f_god(n, xs):
>   if n == 1:
> return f1(xs)
>   elif n == 2:
> return f2(xs)
>   else:
> return f0(xs)
>
> xs = [x for x in range(0, 5)]
> xs_b = sc.broadcast(xs)
> ns = sc.parallelize([x for x in range(0, 3)])
> results = ns.map(lambda n: f_god(n, xs_b.value))
> print results.take(10)


gives me:

[5, 10, 30]
-sujit


On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau  wrote:

> You probably want to look at the map transformation, and the many more
> defined on RDDs. The function you pass in to map is serialized and the
> computation is distributed.
>
>
> On Monday, March 28, 2016, charles li  wrote:
>
>>
>> use case: have a dataset, and want to use different algorithms on that,
>> and fetch the result.
>>
>> for making this, I think I should distribute my algorithms, and run these
>> algorithms on the dataset at the same time, am I right?
>>
>> but it seems that spark can not parallelize/serialize
>> algorithms/functions, then how to make it?
>>
>>
>> *here is the test code*:
>>
>>
>> 
>> def test():
>> pass
>> function_list = [test] * 10
>>
>> sc.parallelize([test] * 10).take(1)
>>
>> 
>>
>>
>> *error message: *
>> Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>>
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>> 9.0 (TID 105, sh-demo-hadoop-07):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 111, in main
>>
>> process()
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 106, in process
>>
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 263, in dump_stream
>>
>> vs = list(itertools.islice(iterator, batch))
>>
>>   File
>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
>> 1293, in takeUpToNumLeft
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 139, in load_stream
>>
>> yield self._read_with_length(stream)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 164, in _read_with_length
>>
>> return self.loads(obj)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 422, in loads
>>
>> return pickle.loads(obj)
>>
>> AttributeError: 'module' object has no attribute 'test'
>>
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>>
>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> what's interesting is that* when I run sc.parallelize([test] *
>> 10).collect() , it works fine*, returns :
>>
>> [,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ]
>>
>>
>>
>>
>> --
>> --
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Re: How to create dataframe from SQL Server SQL query

2015-12-07 Thread Sujit Pal
Hi Ningjun,

Haven't done this myself, saw your question and was curious about the
answer and found this article which you might find useful:
http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/

According this article, you can pass in your SQL statement in the "dbtable"
mapping, ie, something like:

val jdbcDF = sqlContext.read.format("jdbc")
.options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "(select docid, title, docText from
dbo.document where docid between 10 and 1000)"
)).load

-sujit

On Mon, Dec 7, 2015 at 8:26 AM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

> How can I create a RDD from a SQL query against SQLServer database? Here
> is the example of dataframe
>
>
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
>
>
>
>
>
> *val* jdbcDF *=* sqlContext.read.format("jdbc").options(
>
>   *Map*("url" -> "jdbc:postgresql:dbserver",
>
>   "dbtable" -> "schema.tablename")).load()
>
>
>
> This code create dataframe from a table. How can I create dataframe from a
> query, e.g. “select docid, title, docText from dbo.document where docid
> between 10 and 1000”?
>
>
>
> Ningjun
>
>
>


Re: Please add us to the Powered by Spark page

2015-11-24 Thread Sujit Pal
Thank you Sean, much appreciated.

And yes, perhaps "email dev" is a better option since the traffic is
(probably) lighter and these sorts of requests are more likely to get
noticed. Although one would need to subscribe to the dev list to do that...

-sujit

On Tue, Nov 24, 2015 at 1:16 AM, Sean Owen <so...@cloudera.com> wrote:

> Not sure who generally handles that, but I just made the edit.
>
> On Mon, Nov 23, 2015 at 6:26 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
> > Sorry to be a nag, I realize folks with edit rights on the Powered by
> Spark
> > page are very busy people, but its been 10 days since my original
> request,
> > was wondering if maybe it just fell through the cracks. If I should
> submit
> > via some other channel that will make sure it is looked at (or better
> yet, a
> > self service option), please let me know and I will do so.
> >
> > Here is the information again.
> >
> > Organization Name: Elsevier Labs
> > URL: http://labs.elsevier.com
> > Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> > Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content
> as a
> > Service, Content and Event Analytics, Content/Event based Predictive
> Models
> > and Big Data Processing. We use Scala and Python over Databricks
> Notebooks
> > for most of our work.
> >
> > Thanks very much,
> > Sujit
> >
> > On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal <sujitatgt...@gmail.com>
> wrote:
> >>
> >> Hello,
> >>
> >> We have been using Spark at Elsevier Labs for a while now. Would love to
> >> be added to the “Powered By Spark” page.
> >>
> >> Organization Name: Elsevier Labs
> >> URL: http://labs.elsevier.com
> >> Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> >> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content
> as
> >> a Service, Content and Event Analytics, Content/Event based Predictive
> >> Models and Big Data Processing. We use Scala and Python over Databricks
> >> Notebooks for most of our work.
> >>
> >> Thanks very much,
> >>
> >> Sujit Pal
> >> Technical Research Director
> >> Elsevier Labs
> >> sujit@elsevier.com
> >>
> >>
> >
>


Re: Please add us to the Powered by Spark page

2015-11-23 Thread Sujit Pal
Sorry to be a nag, I realize folks with edit rights on the Powered by Spark
page are very busy people, but its been 10 days since my original request,
was wondering if maybe it just fell through the cracks. If I should submit
via some other channel that will make sure it is looked at (or better yet,
a self service option), please let me know and I will do so.

Here is the information again.

Organization Name: Elsevier Labs
URL: http://labs.elsevier.com
Spark components: Spark Core, Spark SQL, MLLib, GraphX.
Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a
Service, Content and Event Analytics, Content/Event based Predictive Models
and Big Data Processing. We use Scala and Python over Databricks Notebooks
for most of our work.

Thanks very much,
Sujit

On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal <sujitatgt...@gmail.com> wrote:

> Hello,
>
> We have been using Spark at Elsevier Labs for a while now. Would love to
> be added to the “Powered By Spark” page.
>
> Organization Name: Elsevier Labs
> URL: http://labs.elsevier.com
> Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as
> a Service, Content and Event Analytics, Content/Event based Predictive
> Models and Big Data Processing. We use Scala and Python over Databricks
> Notebooks for most of our work.
>
> Thanks very much,
>
> Sujit Pal
> Technical Research Director
> Elsevier Labs
> sujit@elsevier.com
>
>
>


Please add us to the Powered by Spark page

2015-11-13 Thread Sujit Pal
Hello,

We have been using Spark at Elsevier Labs for a while now. Would love to be
added to the “Powered By Spark” page.

Organization Name: Elsevier Labs
URL: http://labs.elsevier.com
Spark components: Spark Core, Spark SQL, MLLib, GraphX.
Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a
Service, Content and Event Analytics, Content/Event based Predictive Models
and Big Data Processing. We use Scala and Python over Databricks Notebooks
for most of our work.

Thanks very much,

Sujit Pal
Technical Research Director
Elsevier Labs
sujit@elsevier.com


Re: Prevent possible out of memory when using read/union

2015-11-04 Thread Sujit Pal
Hi Alexander,

You may want to try the wholeTextFiles() method of SparkContext. Using that
you could just do something like this:

sc.wholeTextFiles("hdfs://input_dir")
> .saveAsSequenceFile("hdfs://output_dir")


The wholeTextFiles returns a RDD of ((filename, content)).
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

You will not have to worry about managing memory as much with this approach.

-sujit


On Wed, Nov 4, 2015 at 2:12 AM, Alexander Lenz  wrote:

> Hi colleagues,
>
> In Hadoop I have a lot of folders containing small files. Therefore I am
> reading the content of all folders, union the small files and write the
> unioned data into a single folder
> containing one file. Afterwards I delete the small files and the according
> folders.
>
> I see two possible emerging problems on which I would like to get your
> opinion:
>
> 1.   When reading all the files inside the folders into the master
> program, I think it might appear, that there is such an amount of files
> that the master program will run out of memory?
> To prevent this I thought about checking the file size of the folders and
> only read folders in as long as there is enough memory to handle the amount.
> Do you think that this is a possible solution or is there a better
> solution to handle this problem?
>
> 2.   The other problem is: I am doing a UnionAll to merge all the
> content of the files. In my opinion this will cause that the data needs to
> be brought to a single master and then the data will be unioned there.
> So there might be the same problem, that the application runs out of
> memory.
> My proposed solution would also be to union only if the size does not
> exceed the available memory. Any better solution?
>
> For a better understanding you can have a look at my code at the bottom of
> the mail.
> Would be glad to hear from your experience as I would assume that this
> problem should be a general one.
>
> Thanks & Best, Alex
>
>
>
>
> val sqlContext = new SQLContext(sc)
>
> //get filesystem
> val conf = new Configuration()
> val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"),
> conf)
>
> //get relevant folders
> val directoryStatus = fs.listStatus(new Path("hdfs://
> sandbox.hortonworks.com/demo/parquet/staging/"))
> val latestFolder = directoryStatus.maxBy(x => x.getModificationTime)
>
> val toWorkFolders = directoryStatus.filter(x => x.getModificationTime
> < latestFolder.getModificationTime)
>
> //aggregate folder content
> val parquetFiles = toWorkFolders.map(folder => {
>   sqlContext.read.parquet(folder.getPath.toString)
> })
>
> val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y))
>
> mergedParquet.coalesce(1) //Assemble part-files into one partition
>   ..write.mode(SaveMode.Append)
>   ..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*)
>   ..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/
> ")
>
>


Re: How to close connection in mapPartitions?

2015-10-23 Thread Sujit Pal
Hi Bin,

Very likely the RedisClientPool is being closed too quickly before map has
a chance to get to it. One way to verify would be to comment out the .close
line and see what happens. FWIW I saw a similar problem writing to Solr
where I put a commit where you have a close, and noticed that the commit
was happening before the actual data insertion (in the .map line) happened
(and no data showing up in the index until the next time I ran the code
:-)).

At the time I got around it by doing a zipWithIndex on the Iterator, then
doing a partial commit every n records, and finally doing a commit from the
driver code. However, this won't work for you, and there is a better way
outlined on this page (look for Tobias Pfeiffer, its the code block
immediately following):

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

where you test for hasNext on the iterator and call close if its the last
element, within the scope of the .map call.

-sujit

On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Are you sure RedisClientPool is being initialized properly in the
> constructor of RedisCache? Can you please copy paste the code that you use
> to initialize RedisClientPool inside the constructor of RedisCache?
>
> Thanks,
> Aniket
>
> On Fri, Oct 23, 2015 at 11:47 AM Bin Wang  wrote:
>
>> BTW, "lines" is a DStream.
>>
>> Bin Wang 于2015年10月23日周五 下午2:16写道:
>>
>>> I use mapPartitions to open connections to Redis, I write it like this:
>>>
>>> val seqs = lines.mapPartitions { lines =>
>>>   val cache = new RedisCache(redisUrl, redisPort)
>>>   val result = lines.map(line => Parser.parseBody(line, cache))
>>>   cache.redisPool.close
>>>   result
>>> }
>>>
>>> But it seems the pool is closed before I use it. Am I doing anything
>>> wrong? Here is the error:
>>>
>>> java.lang.IllegalStateException: Pool not open
>>> at 
>>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>> at 
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>> at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>> at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>> at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>> at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>> at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at 
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>> at 
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>


Re: How to get inverse Matrix / RDD or how to solve linear system of equations

2015-10-23 Thread Sujit Pal
Hi Zhiliang,

For a system of equations AX = y, Linear Regression will give you a
best-fit estimate for A (coefficient vector) for a matrix of feature
variables X and corresponding target variable y for a subset of your data.
OTOH, what you are looking for here is to solve for x a system of equations
Ax = b, where A and b are known and you want the vector x.

This Math Stackexchange page [2] explains the math in more detail, but
basically...

A * x = b can be rewritten as x = A.I * b.
You can get the pseudo-inverse of A using SVD (Spark MLLib supports SVD
[1]). So the SVD decomposition would make A a product of three other
matrices.

A = U * S * V.T

and the pseudo-inverse can be written as:

A.I = V * S * U.T

Then x can be found by multiplying A.I with b.

-sujit

[1] https://spark.apache.org/docs/1.2.0/mllib-dimensionality-reduction.html
[2]
http://math.stackexchange.com/questions/458404/how-can-we-compute-pseudoinverse-for-any-matrix


On Fri, Oct 23, 2015 at 2:19 AM, Zhiliang Zhu  wrote:

> Hi Sujit, and All,
>
> Currently I lost in large difficulty, I am eager to get some help from you.
>
> There is some big linear system of equations as:
> Ax = b,  A with N number of row and N number of column, N is very large, b
> = [0, 0, ..., 0, 1]T
> Then, I will sovle it to get x = [x1, x2, ..., xn]T.
>
> The simple solution would be to get inverse(A), and then  x = (inverse(A))
> * b .
> A would be some JavaRDD , however, for RDD/matrix there
> is add/multply/transpose APIs, no inverse API for it!
>
> Then, how would it conveniently get inverse(A), or just solve the linear
> system of equations by some other way...
> In Spark MLlib, there was linear regression, the training process might be
> to solve the coefficients to get some specific linear model, just is,
> Ax = y, just train by (x, y) to get A , this might be used to solve the
> linear system of equations. It is like that? I could not decide.
>
> I must show my deep appreciation torwards your all help.
>
> Thank you very much!
> Zhiliang
>
>
>


Re: Save RandomForest Model from ML package

2015-10-22 Thread Sujit Pal
Hi Sebastian,

You can save models to disk and load them back up. In the snippet below
(copied out of a working Databricks notebook), I train a model, then save
it to disk, then retrieve it back into model2 from disk.

import org.apache.spark.mllib.tree.RandomForest
> import org.apache.spark.mllib.tree.model.RandomForestModel
>


val model = RandomForest.trainClassifier(data, numClasses,
> categoricalFeaturesInfo,
> numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed)
> model.save(sc, inputDir + "models/randomForestModel")
>


val model2 = RandomForestModel.load(sc, inputDir +
> "models/randomForestModel")


Not sure if there is PMML support. The model saves itself into a directory
structure that looks like this:

data/
>   _SUCCESS
>   _common_metadata
>   _metadata
>   part-r-*.gz.parquet (multiple files)
> metadata/
>   _SUCCESS
>   part-0


HTH

-sujit




On Thu, Oct 22, 2015 at 5:33 AM, Sebastian Kuepers <
sebastian.kuep...@publicispixelpark.de> wrote:

> Hey,
>
> I try to figure out the best practice on saving and loading models which
> have bin fitted with the ML package - i.e. with the RandomForest
> classifier.
>
> There is PMML support in the MLib package afaik but not in ML - is that
> correct?
>
> How do you approach this, so that you do not have to fit your model before
> every prediction job?
>
> Thanks,
> Sebastian
>
>
> Sebastian Küpers
> Account Director
>
> Publicis Pixelpark
> Leibnizstrasse 65, 10629 Berlin
> T +49 30 5058 1838
> M +49 172 389 28 52
> sebastian.kuep...@publicispixelpark.de
> Web: publicispixelpark.de, Twitter: @pubpxp
> Facebook: publicispixelpark.de/facebook
> Publicis Pixelpark - eine Marke der Pixelpark AG
> Vorstand: Horst Wagner (Vorsitzender), Dirk Kedrowitsch
> Aufsichtsratsvorsitzender: Pedro Simko
> Amtsgericht Charlottenburg: HRB 72163
>
>
>
>
>
> 
> Disclaimer The information in this email and any attachments may contain
> proprietary and confidential information that is intended for the
> addressee(s) only. If you are not the intended recipient, you are hereby
> notified that any disclosure, copying, distribution, retention or use of
> the contents of this information is prohibited. When addressed to our
> clients or vendors, any information contained in this e-mail or any
> attachments is subject to the terms and conditions in any governing
> contract. If you have received this e-mail in error, please immediately
> contact the sender and delete the e-mail.
>


Re: How to subtract two RDDs with same size

2015-09-23 Thread Sujit Pal
Hi Zhiliang,

How about doing something like this?

val rdd3 = rdd1.zip(rdd2).map(p =>
p._1.zip(p._2).map(z => z._1 - z._2))

The first zip will join the two RDDs and produce an RDD of (Array[Float],
Array[Float]) pairs. On each pair, we zip the two Array[Float] components
together to form an Array[(Float, Float)] and then we subtract the first
element from the second in the inner map (the inner map is a Scala map not
a Spark one).

I tried this out on a notebook:

val rdd1 = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0),
Array(7.0, 8.0, 9.0)))
val rdd2 = sc.parallelize(List(Array(1.0, 4.0, 3.0), Array(4.0, 10.0, 6.0),
Array(7.0, 16.0, 9.0)))
val rdd3 = rdd1.zip(rdd2).map(p => p._1.zip(p._2).map(z => z._1 - z._2))
rdd3.collect()

gives me:
res0: Array[Array[Double]] = Array(Array(0.0, -2.0, 0.0), Array(0.0, -5.0,
0.0), Array(0.0, -8.0, 0.0))

-sujit


On Wed, Sep 23, 2015 at 12:23 AM, Zhiliang Zhu  wrote:

> there is matrix add API, might map rdd2 each row element to be negative ,
> then make rdd1 and rdd2 and call add ?
>
> Or some more ways ...
>
>
>
> On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu <
> zchl.j...@yahoo.com> wrote:
>
>
> Hi All,
>
> There are two RDDs :  RDD rdd1, and RDD rdd2,
> that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with
> same row number and column number.
>
> I would like to get RDD rdd3,  each element in rdd3 is the
> subtract between rdd1 and rdd2 of the
> same position, which is similar Matrix subtract:
> rdd3 = rdd1 - rdd2 ...
>
> It seemed very difficult to operate this kinds of matrix  arithmetic, even
> is about add, subtract, multiple , diff etc...
>
> I shall  appreciate your help very much~~
> Zhiliang
>
>
>
>
>


Re: Calling a method parallel

2015-09-23 Thread Sujit Pal
Hi Tapan,

Perhaps this may work? It takes a range of 0..100 and creates an RDD out of
them, then calls X(i) on each. The X(i) should be executed on the workers
in parallel.

Scala:
val results = sc.parallelize(0 until 100).map(idx => X(idx))

Python:
results = sc.parallelize(range(100)).map(lambda idx: X(idx))

-sujit


On Wed, Sep 23, 2015 at 6:46 AM, Tapan Sharma 
wrote:

> Hi All,
>
> I want to call a method X(int i) from my Spark program for different values
> of i.
> This means.
> X(1), X(2).. X(n)..
> Each time it returns the one object.
> Currently I am doing this sequentially.
>
> Is there any way to run these in parallel and I get back the list of
> objects?
> Sorry for this basic question.
>
> Regards
> Tapan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Calling-a-method-parallel-tp24786.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Would something like this work?

val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))

-sujit


On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu 
wrote:

> Hi Romi,
>
> Thanks very much for your kind help comment~~
>
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
>
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
>
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
>
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
>
> I must show all my sincere thanks torwards your kind help.
>
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
>
> Best Regards,
> Zhiliang
>
>
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman 
> wrote:
>
>
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  > wrote:
>
> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>
>
>
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Haven't used the Java API but found this Javadoc page, may be helpful to
you.

https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

I think the equivalent Java code snippet might go something like this:

RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)

(the second parameter of fromRDD comes from this discussion thread).
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html

There is also the SlidingRDD decorator:
https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html

So maybe something like this:

new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))

-sujit

On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu <zchl.j...@yahoo.com> wrote:

> Hi Sujit,
>
> I must appreciate your kind help very much~
>
> It seems to be OK, however, do you know the corresponding spark Java API
> achievement...
> Is there any java API as scala sliding, and it seemed that I do not find
> spark scala's doc about sliding ...
>
> Thank you very much~
> Zhiliang
>
>
>
> On Monday, September 21, 2015 11:48 PM, Sujit Pal <sujitatgt...@gmail.com>
> wrote:
>
>
> Hi Zhiliang,
>
> Would something like this work?
>
> val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
>
> -sujit
>
>
> On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid
> > wrote:
>
> Hi Romi,
>
> Thanks very much for your kind help comment~~
>
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
>
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
>
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
>
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
>
> I must show all my sincere thanks torwards your kind help.
>
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
>
> Best Regards,
> Zhiliang
>
>
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman <r...@totango.com>
> wrote:
>
>
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid
> > wrote:
>
> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>
>
>
>
>
>
>


Re: Scala: How to match a java object????

2015-08-18 Thread Sujit Pal
Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal =
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

myBigDecimals.map(_.doubleValue)

to get your Seq of Doubles. You will need the JavaConversions._ import to
allow Java Doubles to be treated by Scala as Scala Doubles.

-sujit

On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:

 Hi, thank you for further assistance



 you can reproduce this by simply running



 *5 match { case java.math.BigDecimal = 2 }*



 In my personal case, I am applying a map acton to a Seq[Any], so the
 elements inside are of type any, to which I need to apply a proper
 .asInstanceOf[WhoYouShouldBe].



 Saif



 *From:* William Briggs [mailto:wrbri...@gmail.com]
 *Sent:* Tuesday, August 18, 2015 4:46 PM
 *To:* Ellafi, Saif A.; user@spark.apache.org
 *Subject:* Re: Scala: How to match a java object



 Could you share your pattern matching expression that is failing?



 On Tue, Aug 18, 2015, 3:38 PM  saif.a.ell...@wellsfargo.com wrote:

 Hi all,



 I am trying to run a spark job, in which I receive *java.math.BigDecimal 
 *objects,
 instead of the scala equivalents, and I am trying to convert them into
 Doubles.

 If I try to match-case this object class, I get: *“error: object
 java.math.BigDecimal is not a value”*



 How could I get around matching java objects? I would like to avoid a
 multiple try-catch on ClassCastExceptions for all my checks.



 Thank you,

 Saif






Re: How to increase parallelism of a Spark cluster?

2015-08-03 Thread Sujit Pal
@Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
query string in the partition, sends the query to Solr using SolrJ, and
gets back the top N results. It then reformats the result data into one
long string and returns the key value pair as (query string, result string).

@Igor: Thanks for the parameter suggestions. I will check the
--num-executors and if there is a way to set the number of cores/executor
with my Databricks admin and update here if I find it, but from the
Databricks console, it appears that the number of executors per box is 1.
This seems normal though, per the diagram on this page:

http://spark.apache.org/docs/latest/cluster-overview.html

where it seems that there is 1 executor per box, and each executor can
spawn multiple threads to take care of multiple tasks (see bullet #1 copied
below).

 Each application gets its own executor processes, which stay up for the
 duration of the whole application and run tasks in multiple threads. This
 has the benefit of isolating applications from each other, on both the
 scheduling side (each driver schedules its own tasks) and executor side
 (tasks from different applications run in different JVMs).


Regarding hitting the max number of requests, thanks for the link. I am
using the default client. Just peeked at the Solr code, and the default
settings (if no HttpClient instance is supplied in the ctor) is to use
DefaultHttpClient (from HttpComponents) whose settings are as follows:


- Version: HttpVersion.HTTP_1_1


- ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET


- NoTcpDelay: true


- SocketBufferSize: 8192


- UserAgent: Apache-HttpClient/release (java 1.5)

 In addition, the Solr code sets the following additional config parameters
on the DefaultHttpClient.

  params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
   params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
   params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);

Since all my connections are coming out of 2 worker boxes, it looks like I
could get 32x2 = 64 clients hitting Solr, right?

@Steve: Thanks for the link to the HttpClient config. I was thinking about
using a thread pool (or better using a PoolingHttpClientManager per the
docs), but it probably won't help since its still being fed one request at
a time.
@Abhishek: my observations agree with what you said. In the past I have had
success with repartition to reduce the partition size especially when
groupBy operations were involved. But I believe an executor should be able
to handle multiple tasks in parallel from what I understand about Akka on
which Spark is built - the worker is essentially an ActorSystem which can
contain multiple Actors, each actor works on a queue of tasks. Within an
Actor everything is sequential, but the ActorSystem is responsible for
farming out tasks it gets to each of its Actors. Although it is possible I
could be generalizing incorrectly from my limited experience with Akka.

Thanks again for all your help. Please let me know if something jumps out
and/or if there is some configuration I should check.

-sujit



On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:

 I don't know if (your assertion/expectation that) workers will process
 things (multiple partitions) in parallel is really valid. Or if having more
 partitions than workers will necessarily help (unless you are memory bound
 - so partitions is essentially helping your work size rather than execution
 parallelism).

 [Disclaimer: I am no authority on Spark, but wanted to throw my spin based
 my own understanding].

 Nothing official about it :)

 -abhishek-

 On Jul 31, 2015, at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hello,

 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

  .mapPartitions(keyValues = getResults(keyValues))


 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me

Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Sujit Pal
No one has any ideas?

Is there some more information I should provide?

I am looking for ways to increase the parallelism among workers. Currently
I just see number of simultaneous connections to Solr equal to the number
of workers. My number of partitions is (2.5x) larger than number of
workers, and the workers seem to be large enough to handle more than one
task at a time.

I am creating a single client per partition in my mapPartition call. Not
sure if that is creating the gating situation? Perhaps I should use a Pool
of clients instead?

Would really appreciate some pointers.

Thanks in advance for any help you can provide.

-sujit


On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hello,

 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

  .mapPartitions(keyValues = getResults(keyValues))


 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.

 -sujit




Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Sujit Pal
Hi Igor,

The cluster is a Databricks Spark cluster. It consists of 1 master + 4
workers, each worker has 60GB RAM and 4 CPUs. The original mail has some
more details (also the reference to the HttpSolrClient in there should be
HttpSolrServer, sorry about that, mistake while writing the email).

There is no additional configuration on the external Solr host from my
code, I am using the default HttpClient provided by HttpSolrServer.
According to the Javadocs, you can pass in a HttpClient object as well. Is
there some specific configuration you would suggest to get past any limits?

On another project, I faced a similar problem but I had more leeway (was
using a Spark cluster from EC2) and less time, my workaround was to use
python multiprocessing to create a program that started up 30 python
JSON/HTTP clients and wrote output into 30 output files, which were then
processed by Spark. Reason I mention this is that I was using default
configurations there as well, just needed to increase the number of
connections against Solr to a higher number.

This time round, I would like to do this through Spark because it makes the
pipeline less complex.

-sujit


On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman igor.ber...@gmail.com wrote:

 What kind of cluster? How many cores on each worker? Is there config for
 http solr client? I remember standard httpclient has limit per route/host.
 On Aug 2, 2015 8:17 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 No one has any ideas?

 Is there some more information I should provide?

 I am looking for ways to increase the parallelism among workers.
 Currently I just see number of simultaneous connections to Solr equal to
 the number of workers. My number of partitions is (2.5x) larger than number
 of workers, and the workers seem to be large enough to handle more than one
 task at a time.

 I am creating a single client per partition in my mapPartition call. Not
 sure if that is creating the gating situation? Perhaps I should use a Pool
 of clients instead?

 Would really appreciate some pointers.

 Thanks in advance for any help you can provide.

 -sujit


 On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hello,

 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

  .mapPartitions(keyValues = getResults(keyValues))


 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.

 -sujit





How to increase parallelism of a Spark cluster?

2015-07-31 Thread Sujit Pal
Hello,

I am trying to run a Spark job that hits an external webservice to get back
some information. The cluster is 1 master + 4 workers, each worker has 60GB
RAM and 4 CPUs. The external webservice is a standalone Solr server, and is
accessed using code similar to that shown below.

def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

 .mapPartitions(keyValues = getResults(keyValues))


The mapPartitions does some initialization to the SolrJ client per
partition and then hits it for each record in the partition via the
getResults() call.

I repartitioned in the hope that this will result in 10 clients hitting
Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
clients if I can). However, I counted the number of open connections using
netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
observed that Solr has a constant 4 clients (ie, equal to the number of
workers) over the lifetime of the run.

My observation leads me to believe that each worker processes a single
stream of work sequentially. However, from what I understand about how
Spark works, each worker should be able to process number of tasks
parallelly, and that repartition() is a hint for it to do so.

Is there some SparkConf environment variable I should set to increase
parallelism in these workers, or should I just configure a cluster with
multiple workers per machine? Or is there something I am doing wrong?

Thank you in advance for any pointers you can provide.

-sujit


Re: use S3-Compatible Storage with spark

2015-07-17 Thread Sujit Pal
Hi Schmirr,

The part after the s3n:// is your bucket name and folder name, ie
s3n://${bucket_name}/${folder_name}[/${subfolder_name}]*. Bucket names are
unique across S3, so the resulting path is also unique. There is no concept
of hostname in s3 urls as far as I know.

-sujit


On Fri, Jul 17, 2015 at 1:36 AM, Schmirr Wurst schmirrwu...@gmail.com
wrote:

 Hi,

 I wonder how to use S3 compatible Storage in Spark ?
 If I'm using s3n:// url schema, the it will point to amazon, is there
 a way I can specify the host somewhere ?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark on EMR with S3 example (Python)

2015-07-15 Thread Sujit Pal
Hi Roberto,

I think you would need to as Akhil said. Just checked from this page:

http://aws.amazon.com/public-data-sets/

and clicking through to a few dataset links, all of them are available on
s3 (some are available via http and ftp, but I think the point of these
datasets are that they are usually very large so having it on s3 ensures
that its easier to take your code to it than bring the datasets to your
code.

-sujit


On Tue, Jul 14, 2015 at 1:56 PM, Pagliari, Roberto rpagli...@appcomsci.com
wrote:

 Hi Sujit,

 I just wanted to access public datasets on Amazon. Do I still need to
 provide the keys?



 Thank you,





 *From:* Sujit Pal [mailto:sujitatgt...@gmail.com]
 *Sent:* Tuesday, July 14, 2015 3:14 PM
 *To:* Pagliari, Roberto
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark on EMR with S3 example (Python)



 Hi Roberto,



 I have written PySpark code that reads from private S3 buckets, it should
 be similar for public S3 buckets as well. You need to set the AWS access
 and secret keys into the SparkContext, then you can access the S3 folders
 and files with their s3n:// paths. Something like this:



 sc = SparkContext()

 sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, aws_access_key)

 sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey,
 aws_secret_key)



 mydata = sc.textFile(s3n://mybucket/my_input_folder) \

 .map(lambda x: do_something(x)) \

 .saveAsTextFile(s3://mybucket/my_output_folder)

 ...



 You can read and write sequence files as well - these are the only 2
 formats I have tried, but I'm sure the other ones like JSON would work
 also. Another approach is to embed the AWS access key and secret key into
 the s3n:// path.



 I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its
 an older version but not sure) but it works for access.



 Hope this helps,

 Sujit





 On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto 
 rpagli...@appcomsci.com wrote:

 Is there an example about how to load data from a public S3 bucket in
 Python? I haven’t found any.



 Thank you,







Re: Efficiency of leftOuterJoin a cassandra rdd

2015-07-15 Thread Sujit Pal
Hi Wush,

One option may be to try a replicated join. Since your rdd1 is small, read
it into a collection and broadcast it to the workers, then filter your
larger rdd2 against the collection on the workers.

-sujit


On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain deepuj...@gmail.com wrote:

 Leftouterjoin and join apis are super slow in spark. 100x slower than
 hadoop

 Sent from my iPhone

  On 14-Jul-2015, at 10:59 PM, Wush Wu wush...@gmail.com wrote:
 
  I don't understand.
 
  By the way, the `joinWithCassandraTable` does improve my query time
  from 40 mins to 3 mins.
 
 
  2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:
  I have explored spark joins for last few months (you can search my
 posts)
  and its frustrating useless.
 
  On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu wush...@gmail.com wrote:
 
  Dear all,
 
  I have found a post discussing the same thing:
 
 
 https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
 
  The solution is using joinWithCassandraTable and the documentation
  is here:
 
 https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
 
  Wush
 
  2015-07-15 12:15 GMT+08:00 Wush Wu wush...@gmail.com:
  Dear all,
 
  I am trying to join two RDDs, named rdd1 and rdd2.
 
  rdd1 is loaded from a textfile with about 33000 records.
 
  rdd2 is loaded from a table in cassandra which has about 3 billions
  records.
 
  I tried the following code:
 
  ```scala
 
  val rdd1 : (String, XXX) = sc.textFile(...).map(...)
  import org.apache.spark.sql.cassandra.CassandraSQLContext
  cc.setKeyspace(xxx)
  val rdd2 : (String, String) = cc.sql(SELECT x, y FROM xxx).map(r =
  ...)
 
  val result = rdd1.leftOuterJoin(rdd2)
  result.take(20)
 
  ```
 
  However, the log shows that the spark loaded 3 billions records from
  cassandra and only 33000 records left at the end.
 
  Is there a way to query the cassandra based on the key in rdd1?
 
  Here is some information of our system:
 
  - The spark version is 1.3.1
  - The cassandra version is 2.0.14
  - The key of joining is the primary key of the cassandra table.
 
  Best,
  Wush
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
  --
  Deepak
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark on EMR with S3 example (Python)

2015-07-14 Thread Sujit Pal
Hi Roberto,

I have written PySpark code that reads from private S3 buckets, it should
be similar for public S3 buckets as well. You need to set the AWS access
and secret keys into the SparkContext, then you can access the S3 folders
and files with their s3n:// paths. Something like this:

sc = SparkContext()
sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, aws_access_key)
sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey,
aws_secret_key)

mydata = sc.textFile(s3n://mybucket/my_input_folder) \
.map(lambda x: do_something(x)) \
.saveAsTextFile(s3://mybucket/my_output_folder)
...

You can read and write sequence files as well - these are the only 2
formats I have tried, but I'm sure the other ones like JSON would work
also. Another approach is to embed the AWS access key and secret key into
the s3n:// path.

I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its
an older version but not sure) but it works for access.

Hope this helps,
Sujit


On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto rpagli...@appcomsci.com
 wrote:

 Is there an example about how to load data from a public S3 bucket in
 Python? I haven’t found any.



 Thank you,





Re: PySpark without PySpark

2015-07-10 Thread Sujit Pal
Hi Ashish,

Cool. glad it worked out. I have only used Spark clusters on EC2, which I
spin up using the spark-ec2 scripts (part of the Spark downloads). So don't
have any experience setting up inhouse clusters like you want to do. But I
found some documentation here that may be helpful.
https://docs.sigmoidanalytics.com/index.php/Installing_Spark_and_Setting_Up_Your_Cluster#Deploying_set_of_machines_over_SSH

There are other options as well in this document that will require you to
know some other tools like Chef (previous sections).

Good luck,
Sujit


On Thu, Jul 9, 2015 at 10:25 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hi Sujit,
 Thank you for your time to help me out. And special thank you for your
 elaborate steps.
 I corrected SPARK_HOME to be c:\spark-1.3.0
 2) I installed py4j from anaconda command prompt and the command you gave
 executed successfully.
 3) I replaced python27 as python in the 00-setup script.
 I now give the Path variables as defined and the PATH.

 SPARK_HOMEC:\Spark-1.3.0
 JAVA_HOME   C:\Program Files\Java\jdk1.7.0_79
 PYTHONPATH C:\Users\Ashish Dutt\Anaconda
 MAVEN_HOMEC:\Maven\bin
 SBT_HOME C:\SBT
 PATH   %JAVA_HOME%\BIN; %PYTHON_PATH%; %HADOOP_HOME%\BIN;
 %SPARK_HOME%; %M2_HOME%\BIN %MAVEN_HOME%\BIN;%SBT_HOME%;

 4) This time, I grabbed my baseball bat (you do know why..) invoked
 ipython notebook again and with the other free hand I slowly typed the
 command
 print SPARK_HOME -- it worked Then another command from pyspark import
 SparkContext and it worked too!!!
 The baseball bat dropped to the ground and I quickly jabbed the other
 commands given in the post. Attached is the screenshot and it all worked...
 EUREKA...

 Sujit, a quintal of thanks for your persistence in helping me resolve this
 problem. You have been very helpful and I wish you luck and success in all
 your endeavors.
 Next milestone is to get this to work in a cluster environment.

 I am confused that do I need to install spark-1.3.0 on all the 4 linux
 machines that make my cluster?
 The goal is to use my laptop as a client (from where I will submit spark
 commands to the master server) The master can then distribute the job to
 the three nodes and provide the client with the end result.
 Am i correct in this visualization ?

 Once again, thank you for your efforts.


 Sincerely,
 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Fri, Jul 10, 2015 at 11:48 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hi Ashish,

 Julian's approach is probably better, but few observations:

 1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin).

 2) If you have anaconda python installed (I saw that you had set this up
 in a separate thread, py4j should be part of the package - at least I think
 so. To test this, try in your python repl:
  from py4j.java_gateway import JavaGateway
 if it succeeds you already have it.

 3) In case Py4J is not installed, the best way to install a new package
 is using easy_install or pip. Make sure your path is set up so when you
 call python you are calling the anaconda version (in case you have multiple
 python versions), then if so, do easy_install py4j - this will install
 py4j correctly without any messing around on your part. Install
 instructions for py4j available on their site:
 http://py4j.sourceforge.net/install.html

 4) You should replace the python2 in your 00-setup-script with
 python, so you point to the $SPARK_HOME/python directory
 (C:\spark-1.3.0\python).

 -sujit


 On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com
 wrote:

 Hello Sujit,
 Many thanks for your response.
 To answer your questions;
 Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It
 is SPARK_HOME=C:/spark-1.3.0/bin
 Q2) Is there a python2 or python subdirectory under the root of your
 Spark installation? - Yes, i do have that too. It is called python. To fix
 this problem this is what I did,
 I downloaded py4j-0.8.2.1-src from here
 https://pypi.python.org/pypi/py4j which was not there initially when
 I downloaded the spark package from the official repository. I then put it
 in the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract
 the zip file. I put it in as it is.
 The pyspark folder of the spark-1.3.0 root folder. What I next did was
 copy this file and put it in the  pythonpath. So my python path now reads
 as PYTHONPATH=C:/Python27/

 I then rebooted the computer and a silent prayer :-) Then I opened the
 command prompt and invoked the command pyspark from the bin directory of
 spark and EUREKA, it worked :-)  Attached is the screenshot for the same.
 Now, the problem is with IPython notebook. I cannot get it to work with
 pySpark.
 I have a cluster with 4 nodes using CDH5.4

 I was able to resolve the problem. Now the next challenge was to
 configure it with IPython. Followed the steps as documented

Re: PySpark without PySpark

2015-07-09 Thread Sujit Pal
Hi Ashish,

Your 00-pyspark-setup file looks very different from mine (and from the one
described in the blog post). Questions:

1) Do you have SPARK_HOME set up in your environment? Because if not, it
sets it to None in your code. You should provide the path to your Spark
installation. In my case I have spark-1.3.1 installed under $HOME/Software
and the code block under # Configure the environment (or yellow highlight
in the code below) reflects that.
2) Is there a python2 or python subdirectory under the root of your Spark
installation? In my case its python not python2. This contains the
Python bindings for spark, so the block under # Add the PySpark/py4j to
the Python path (or green highlight in the code below) adds it to the
Python sys.path so things like pyspark.SparkContext are accessible in your
Python environment.

import os
import sys

# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, python, build))
sys.path.insert(0, os.path.join(SPARK_HOME, python))

Hope this fixes things for you.

-sujit


On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hi Sujit,
 Thanks for your response.

 So i opened a new notebook using the command ipython notebook --profile
 spark and tried the sequence of commands. i am getting errors. Attached is
 the screenshot of the same.
 Also I am attaching the  00-pyspark-setup.py for your reference. Looks
 like, I have written something wrong here. Cannot seem to figure out, what
 is it?

 Thank you for your help


 Sincerely,
 Ashish Dutt

 On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hi Ashish,

  Nice post.
 Agreed, kudos to the author of the post, Benjamin Benfort of District
 Labs.

  Following your post, I get this problem;
 Again, not my post.

 I did try setting up IPython with the Spark profile for the edX Intro to
 Spark course (because I didn't want to use the Vagrant container) and it
 worked flawlessly with the instructions provided (on OSX). I haven't used
 the IPython/PySpark environment beyond very basic tasks since then though,
 because my employer has a Databricks license which we were already using
 for other stuff and we ended up doing the labs on Databricks.

 Looking at your screenshot though, I don't see why you think its picking
 up the default profile. One simple way of checking to see if things are
 working is to open a new notebook and try this sequence of commands:

 from pyspark import SparkContext
 sc = SparkContext(local, pyspark)
 sc

 You should see something like this after a little while:
 pyspark.context.SparkContext at 0x1093c9b10

 While the context is being instantiated, you should also see lots of log
 lines scroll by on the terminal where you started the ipython notebook
 --profile spark command - these log lines are from Spark.

 Hope this helps,
 Sujit


 On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com
 wrote:

 Hi Sujit,
 Nice post.. Exactly what I had been looking for.
 I am relatively a beginner with Spark and real time data processing.
 We have a server with CDH5.4 with 4 nodes. The spark version in our
 server is 1.3.0
 On my laptop I have spark 1.3.0 too and its using Windows 7 environment.
 As per point 5 of your post I am able to invoke pyspark locally as in a
 standalone mode.

 Following your post, I get this problem;

 1. In section Using Ipython notebook with spark I cannot understand
 why it is picking up the default profile and not the pyspark profile. I am
 sure it is because of the path variables. Attached is the screenshot. Can
 you suggest how to solve this.

 Current the path variables for my laptop are like
 SPARK_HOME=C:\SPARK-1.3.0\BIN, JAVA_HOME=C:\PROGRAM
 FILES\JAVA\JDK1.7.0_79, HADOOP_HOME=D:\WINUTILS, M2_HOME=D:\MAVEN\BIN,
 MAVEN_HOME=D:\MAVEN\BIN, PYTHON_HOME=C:\PYTHON27\, SBT_HOME=C:\SBT\


 Sincerely,
 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 You are welcome Davies. Just to clarify, I didn't write the post (not
 sure if my earlier post gave that impression, apologize if so), although I
 agree its great :-).

 -sujit


 On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com
 wrote:

 Great post, thanks for sharing with us!


 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:
  Hi Julian,
 
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark
 cluster on
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
 looking
  for). Can't share the code, but the basic approach is covered in
 this blog
  post

Re: PySpark without PySpark

2015-07-09 Thread Sujit Pal
Hi Ashish,

Julian's approach is probably better, but few observations:

1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin).

2) If you have anaconda python installed (I saw that you had set this up in
a separate thread, py4j should be part of the package - at least I think
so. To test this, try in your python repl:
 from py4j.java_gateway import JavaGateway
if it succeeds you already have it.

3) In case Py4J is not installed, the best way to install a new package is
using easy_install or pip. Make sure your path is set up so when you call
python you are calling the anaconda version (in case you have multiple
python versions), then if so, do easy_install py4j - this will install
py4j correctly without any messing around on your part. Install
instructions for py4j available on their site:
http://py4j.sourceforge.net/install.html

4) You should replace the python2 in your 00-setup-script with python,
so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python).

-sujit


On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hello Sujit,
 Many thanks for your response.
 To answer your questions;
 Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is
 SPARK_HOME=C:/spark-1.3.0/bin
 Q2) Is there a python2 or python subdirectory under the root of your
 Spark installation? - Yes, i do have that too. It is called python. To fix
 this problem this is what I did,
 I downloaded py4j-0.8.2.1-src from here
 https://pypi.python.org/pypi/py4j which was not there initially when I
 downloaded the spark package from the official repository. I then put it in
 the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the
 zip file. I put it in as it is.
 The pyspark folder of the spark-1.3.0 root folder. What I next did was
 copy this file and put it in the  pythonpath. So my python path now reads
 as PYTHONPATH=C:/Python27/

 I then rebooted the computer and a silent prayer :-) Then I opened the
 command prompt and invoked the command pyspark from the bin directory of
 spark and EUREKA, it worked :-)  Attached is the screenshot for the same.
 Now, the problem is with IPython notebook. I cannot get it to work with
 pySpark.
 I have a cluster with 4 nodes using CDH5.4

 I was able to resolve the problem. Now the next challenge was to configure
 it with IPython. Followed the steps as documented in the blog. And I get
 the errors, attached is the screenshot

 @Julian, I tried your method too. Attached is the screenshot of the error
 message 7.png

 Hope you can help me out to fix this problem.
 Thank you for your time.

 Sincerely,
 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Fri, Jul 10, 2015 at 12:02 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hi Ashish,

 Your 00-pyspark-setup file looks very different from mine (and from the
 one described in the blog post). Questions:

 1) Do you have SPARK_HOME set up in your environment? Because if not, it
 sets it to None in your code. You should provide the path to your Spark
 installation. In my case I have spark-1.3.1 installed under $HOME/Software
 and the code block under # Configure the environment (or yellow highlight
 in the code below) reflects that.
 2) Is there a python2 or python subdirectory under the root of your Spark
 installation? In my case its python not python2. This contains the
 Python bindings for spark, so the block under # Add the PySpark/py4j to
 the Python path (or green highlight in the code below) adds it to the
 Python sys.path so things like pyspark.SparkContext are accessible in your
 Python environment.

 import os
 import sys

 # Configure the environment
 if 'SPARK_HOME' not in os.environ:
 os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1

 # Create a variable for our root path
 SPARK_HOME = os.environ['SPARK_HOME']

 # Add the PySpark/py4j to the Python Path
 sys.path.insert(0, os.path.join(SPARK_HOME, python, build))
 sys.path.insert(0, os.path.join(SPARK_HOME, python))

 Hope this fixes things for you.

 -sujit


 On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com
 wrote:

 Hi Sujit,
 Thanks for your response.

 So i opened a new notebook using the command ipython notebook --profile
 spark and tried the sequence of commands. i am getting errors. Attached is
 the screenshot of the same.
 Also I am attaching the  00-pyspark-setup.py for your reference. Looks
 like, I have written something wrong here. Cannot seem to figure out, what
 is it?

 Thank you for your help


 Sincerely,
 Ashish Dutt

 On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hi Ashish,

  Nice post.
 Agreed, kudos to the author of the post, Benjamin Benfort of District
 Labs.

  Following your post, I get this problem;
 Again, not my post.

 I did try setting up IPython with the Spark profile for the edX Intro
 to Spark course (because I didn't

Re: PySpark without PySpark

2015-07-08 Thread Sujit Pal
Hi Julian,

I recently built a Python+Spark application to do search relevance
analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on
EC2 (so I don't use the PySpark shell, hopefully thats what you are looking
for). Can't share the code, but the basic approach is covered in this blog
post - scroll down to the section Writing a Spark Application.

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

Hope this helps,

-sujit


On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com wrote:

 Hey.

 Is there a resource that has written up what the necessary steps are for
 running PySpark without using the PySpark shell?

 I can reverse engineer (by following the tracebacks and reading the shell
 source) what the relevant Java imports needed are, but I would assume
 someone has attempted this before and just published something I can either
 follow or install? If not, I have something that pretty much works and can
 publish it, but I'm not a heavy Spark user, so there may be some things
 I've
 left out that I haven't hit because of how little of pyspark I'm playing
 with.

 Thanks,
 Julian



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: PySpark without PySpark

2015-07-08 Thread Sujit Pal
Hi Ashish,

 Nice post.
Agreed, kudos to the author of the post, Benjamin Benfort of District Labs.

 Following your post, I get this problem;
Again, not my post.

I did try setting up IPython with the Spark profile for the edX Intro to
Spark course (because I didn't want to use the Vagrant container) and it
worked flawlessly with the instructions provided (on OSX). I haven't used
the IPython/PySpark environment beyond very basic tasks since then though,
because my employer has a Databricks license which we were already using
for other stuff and we ended up doing the labs on Databricks.

Looking at your screenshot though, I don't see why you think its picking up
the default profile. One simple way of checking to see if things are
working is to open a new notebook and try this sequence of commands:

from pyspark import SparkContext
sc = SparkContext(local, pyspark)
sc

You should see something like this after a little while:
pyspark.context.SparkContext at 0x1093c9b10

While the context is being instantiated, you should also see lots of log
lines scroll by on the terminal where you started the ipython notebook
--profile spark command - these log lines are from Spark.

Hope this helps,
Sujit


On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 Hi Sujit,
 Nice post.. Exactly what I had been looking for.
 I am relatively a beginner with Spark and real time data processing.
 We have a server with CDH5.4 with 4 nodes. The spark version in our server
 is 1.3.0
 On my laptop I have spark 1.3.0 too and its using Windows 7 environment.
 As per point 5 of your post I am able to invoke pyspark locally as in a
 standalone mode.

 Following your post, I get this problem;

 1. In section Using Ipython notebook with spark I cannot understand why
 it is picking up the default profile and not the pyspark profile. I am sure
 it is because of the path variables. Attached is the screenshot. Can you
 suggest how to solve this.

 Current the path variables for my laptop are like
 SPARK_HOME=C:\SPARK-1.3.0\BIN, JAVA_HOME=C:\PROGRAM
 FILES\JAVA\JDK1.7.0_79, HADOOP_HOME=D:\WINUTILS, M2_HOME=D:\MAVEN\BIN,
 MAVEN_HOME=D:\MAVEN\BIN, PYTHON_HOME=C:\PYTHON27\, SBT_HOME=C:\SBT\


 Sincerely,
 Ashish Dutt
 PhD Candidate
 Department of Information Systems
 University of Malaya, Lembah Pantai,
 50603 Kuala Lumpur, Malaysia

 On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal sujitatgt...@gmail.com wrote:

 You are welcome Davies. Just to clarify, I didn't write the post (not
 sure if my earlier post gave that impression, apologize if so), although I
 agree its great :-).

 -sujit


 On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com
 wrote:

 Great post, thanks for sharing with us!

 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com
 wrote:
  Hi Julian,
 
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark
 cluster on
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
 looking
  for). Can't share the code, but the basic approach is covered in this
 blog
  post - scroll down to the section Writing a Spark Application.
 
 
 https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
 
  Hope this helps,
 
  -sujit
 
 
  On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com
 wrote:
 
  Hey.
 
  Is there a resource that has written up what the necessary steps are
 for
  running PySpark without using the PySpark shell?
 
  I can reverse engineer (by following the tracebacks and reading the
 shell
  source) what the relevant Java imports needed are, but I would assume
  someone has attempted this before and just published something I can
  either
  follow or install? If not, I have something that pretty much works
 and can
  publish it, but I'm not a heavy Spark user, so there may be some
 things
  I've
  left out that I haven't hit because of how little of pyspark I'm
 playing
  with.
 
  Thanks,
  Julian
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 






Re: PySpark without PySpark

2015-07-08 Thread Sujit Pal
You are welcome Davies. Just to clarify, I didn't write the post (not sure
if my earlier post gave that impression, apologize if so), although I agree
its great :-).

-sujit


On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com wrote:

 Great post, thanks for sharing with us!

 On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com wrote:
  Hi Julian,
 
  I recently built a Python+Spark application to do search relevance
  analytics. I use spark-submit to submit PySpark jobs to a Spark cluster
 on
  EC2 (so I don't use the PySpark shell, hopefully thats what you are
 looking
  for). Can't share the code, but the basic approach is covered in this
 blog
  post - scroll down to the section Writing a Spark Application.
 
 
 https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
 
  Hope this helps,
 
  -sujit
 
 
  On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com
 wrote:
 
  Hey.
 
  Is there a resource that has written up what the necessary steps are for
  running PySpark without using the PySpark shell?
 
  I can reverse engineer (by following the tracebacks and reading the
 shell
  source) what the relevant Java imports needed are, but I would assume
  someone has attempted this before and just published something I can
  either
  follow or install? If not, I have something that pretty much works and
 can
  publish it, but I'm not a heavy Spark user, so there may be some things
  I've
  left out that I haven't hit because of how little of pyspark I'm playing
  with.
 
  Thanks,
  Julian
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: HOw to concatenate two csv files into one RDD?

2015-06-26 Thread Sujit Pal
Hi Rex,

If the CSV files are in the same folder and there are no other files,
specifying the directory to sc.textFiles() (or equivalent) will pull in all
the files. If there are other files, you can pass in a pattern that would
capture the two files you care about (if thats possible). If neither of
these work for you, you can create individual RDDs for each file and union
them.

-sujit


On Fri, Jun 26, 2015 at 11:00 AM, Rex X dnsr...@gmail.com wrote:

 With Python Pandas, it is easy to do concatenation of dataframes
 by combining  pandas.concat
 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.concat.html
 and pandas.read_csv

 pd.concat([pd.read_csv(os.path.join(Path_to_csv_files, f)) for f in
 csvfiles])

 where csvfiles is the list of csv files


 HOw can we do this in Spark?





Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?

2015-06-16 Thread Sujit Pal
Hi Rexx,

In general (ie not Spark specific), its best to convert categorical data to
1-hot encoding rather than integers - that way the algorithm doesn't use
the ordering implicit in the integer representation.

-sujit


On Tue, Jun 16, 2015 at 1:17 PM, Rex X dnsr...@gmail.com wrote:

 Is it necessary to convert categorical data into integers?

 Any tips would be greatly appreciated!

 -Rex

 On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote:

 For clustering analysis, we need a way to measure distances.

 When the data contains different levels of measurement -
 *binary / categorical (nominal), counts (ordinal), and ratio (scale)*

 To be concrete, for example, working with attributes of
 *city, zip, satisfaction_level, price*

 In the meanwhile, the real data usually also contains string attributes,
 for example, book titles. The distance between two strings can be measured
 by minimum-edit-distance.


 In SPSS, it provides Two-Step Cluster, which can handle both ratio scale
 and ordinal numbers.


 What is right algorithm to do hierarchical clustering analysis with all
 these four-kind attributes above with *MLlib*?


 If we cannot find a right metric to measure the distance, an alternative
 solution is to do a topological data analysis (e.g. linkage, and etc).
 Can we do such kind of analysis with *GraphX*?


 -Rex





Re: Access several s3 buckets, with credentials containing /

2015-06-06 Thread Sujit Pal
Hi Pierre,

One way is to recreate your credentials until AWS generates one without a
slash character in it. Another way I've been using is to pass these
credentials outside the S3 file path by setting the following (where sc is
the SparkContext).

sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, ACCESS_KEY)

sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey,
SECRET_KEY)

After that you can define the RDDs more simply:

val c1 = sc.textFile(s3n://bucket1/file.csv)

-sujit



On Fri, Jun 5, 2015 at 3:55 AM, Steve Loughran ste...@hortonworks.com
wrote:


  On 5 Jun 2015, at 08:03, Pierre B 
 pierre.borckm...@realimpactanalytics.com wrote:
 
  Hi list!
 
  My problem is quite simple.
  I need to access several S3 buckets, using different credentials.:
  ```
  val c1 =
 
 sc.textFile(s3n://[ACCESS_KEY_ID1:SECRET_ACCESS_KEY1]@bucket1/file.csv).count
  val c2 =
 
 sc.textFile(s3n://[ACCESS_KEY_ID2:SECRET_ACCESS_KEY2]@bucket2/file.csv).count
  val c3 =
 
 sc.textFile(s3n://[ACCESS_KEY_ID3:SECRET_ACCESS_KEY3]@bucket3/file.csv).count
  ...
  ```
 
  One/several of those AWS credentials might contain / in the private
 access
  key.
  This is a known problem and from my research, the only ways to deal with
  these / are:
  1/ use environment variables to set the AWS credentials, then access the
 s3
  buckets without specifying the credentials
  2/ set the hadoop configuration to contain the the credentials.
 
  However, none of these solutions allow me to access different buckets,
 with
  different credentials.
 
  Can anyone help me on this?
 
  Thanks
 
  Pierre

 long known outstanding bug in Hadoop s3n, nobody has ever sat down to fix.
 One subtlety is its really hard to test -as you need credentials with a /
 in.

 The general best practise is recreate your credentials

 Now, if you can get the patch to work against hadoop trunk, I promise I
 will commit it
 https://issues.apache.org/jira/browse/HADOOP-3733

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Not able to run SparkPi locally

2015-05-23 Thread Sujit Pal
Hello all,

This is probably me doing something obviously wrong, would really
appreciate some pointers on how to fix this.

I installed spark-1.3.1-bin-hadoop2.6.tgz from the Spark download page [
https://spark.apache.org/downloads.html] and just untarred it on a local
drive. I am on Mac OSX 10.9.5 and the JDK is 1.8.0_40.

I ran the following commands (the first 3 run succesfully, I mention it
here to rule out any possibility of it being an obviously bad install).

1) laptop$ bin/spark-shell

scala sc.parallelize(1 to 100).count()

res0: Long = 100

scala exit

2) laptop$ bin/pyspark

 sc.parallelize(range(100)).count()

100

 quit()

3) laptop$ bin/spark-submit examples/src/main/python/pi.py

Pi is roughly 3.142800

4) laptop$ bin/run-example SparkPi

This hangs at this line (full stack trace is provided at the end of this
mail)

15/05/23 07:52:10 INFO Executor: Fetching
http://10.0.0.5:51575/jars/spark-examples-1.3.1-hadoop2.6.0.jar with
timestamp 1432392670140

15/05/23 07:52:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.net.SocketTimeoutException: connect timed out

...

and finally dies with this message:

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost):
java.net.SocketTimeoutException: connect timed out


I checked with ifconfig -a on my box, 10.0.0.5 is my IP address on my local
network.

en0: flags=8863UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST mtu 1500

ether 34:36:3b:d2:b0:f4

inet 10.0.0.5 netmask 0xff00 broadcast 10.0.0.255

media: autoselect

 status: active


I think perhaps there may be some configuration I am missing. Being able to
run jobs locally (without HDFS or creating a cluster) is essential for
development, and the examples come from the Spark 1.3.1 Quick Start page [
https://spark.apache.org/docs/latest/quick-start.html], so this is probably
something to do with my environment.


Thanks in advance for any help you can provide.

-sujit

=

Full output of SparkPi run (including stack trace) follows:

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

15/05/23 08:08:55 INFO SparkContext: Running Spark version 1.3.1

15/05/23 08:08:57 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

15/05/23 08:08:57 INFO SecurityManager: Changing view acls to: palsujit

15/05/23 08:08:57 INFO SecurityManager: Changing modify acls to: palsujit

15/05/23 08:08:57 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(palsujit);
users with modify permissions: Set(palsujit)

15/05/23 08:08:57 INFO Slf4jLogger: Slf4jLogger started

15/05/23 08:08:57 INFO Remoting: Starting remoting

15/05/23 08:08:58 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@10.0.0.5:52008]

15/05/23 08:08:58 INFO Utils: Successfully started service 'sparkDriver' on
port 52008.

15/05/23 08:08:58 INFO SparkEnv: Registering MapOutputTracker

15/05/23 08:08:58 INFO SparkEnv: Registering BlockManagerMaster

15/05/23 08:08:58 INFO DiskBlockManager: Created local directory at
/var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-d97baddf-1b6f-41db-92bb-f82ab5184cb7/blockmgr-4ef3a194-1929-4dd3-a0e5-215175d8e41a

15/05/23 08:08:58 INFO MemoryStore: MemoryStore started with capacity 265.1
MB

15/05/23 08:08:58 INFO HttpFileServer: HTTP File server directory is
/var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-fdf36480-def0-44b7-9942-098d9ef3e2b4/httpd-e494852a-7d61-4441-8b80-566d9f820afb

15/05/23 08:08:58 INFO HttpServer: Starting HTTP Server

15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

15/05/23 08:08:58 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:52009

15/05/23 08:08:58 INFO Utils: Successfully started service 'HTTP file
server' on port 52009.

15/05/23 08:08:58 INFO SparkEnv: Registering OutputCommitCoordinator

15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

15/05/23 08:08:58 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040

15/05/23 08:08:58 INFO Utils: Successfully started service 'SparkUI' on
port 4040.

15/05/23 08:08:58 INFO SparkUI: Started SparkUI at http://10.0.0.5:4040

15/05/23 08:08:58 INFO SparkContext: Added JAR
file:/Users/palsujit/Software/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar
at http://10.0.0.5:52009/jars/spark-examples-1.3.1-hadoop2.6.0.jar with
timestamp 1432393738514

15/05/23 08:08:58 INFO Executor: Starting executor ID driver on host
localhost

15/05/23 08:08:58 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@10.0.0.5:52008/user/HeartbeatReceiver

15/05/23 08:08:58 INFO NettyBlockTransferService: Server created on 52010

15/05/23 08:08:58 INFO BlockManagerMaster: Trying to register BlockManager

15/05/23 08:08:58 INFO 

Re: Not able to run SparkPi locally

2015-05-23 Thread Sujit Pal
Replying to my own email in case someone has the same or similar issue.

On a hunch I ran this against my Linux (Ubuntu 14.04 with JDK 8) box. Not
only did bin/run-example SparkPi run without any problems, it also
provided a very helpful message in the output.

15/05/23 08:35:15 WARN Utils: Your hostname, tsunami resolves to a loopback
address: 127.0.1.1; using 10.0.0.10 instead (on interface wlan0)

15/05/23 08:35:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address

So I went back to my Mac, set SPARK_LOCAL_IP=127.0.0.1 and everything runs
fine now. To make this permanent I put this in conf/spark-env.sh.

-sujit




On Sat, May 23, 2015 at 8:14 AM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hello all,

 This is probably me doing something obviously wrong, would really
 appreciate some pointers on how to fix this.

 I installed spark-1.3.1-bin-hadoop2.6.tgz from the Spark download page [
 https://spark.apache.org/downloads.html] and just untarred it on a local
 drive. I am on Mac OSX 10.9.5 and the JDK is 1.8.0_40.

 I ran the following commands (the first 3 run succesfully, I mention it
 here to rule out any possibility of it being an obviously bad install).

 1) laptop$ bin/spark-shell

 scala sc.parallelize(1 to 100).count()

 res0: Long = 100

 scala exit

 2) laptop$ bin/pyspark

  sc.parallelize(range(100)).count()

 100

  quit()

 3) laptop$ bin/spark-submit examples/src/main/python/pi.py

 Pi is roughly 3.142800

 4) laptop$ bin/run-example SparkPi

 This hangs at this line (full stack trace is provided at the end of this
 mail)

 15/05/23 07:52:10 INFO Executor: Fetching
 http://10.0.0.5:51575/jars/spark-examples-1.3.1-hadoop2.6.0.jar with
 timestamp 1432392670140

 15/05/23 07:52:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
 0)

 java.net.SocketTimeoutException: connect timed out

 ...

 and finally dies with this message:

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
 failure: Lost task 0.0 in stage 0.0 (TID 0, localhost):
 java.net.SocketTimeoutException: connect timed out


 I checked with ifconfig -a on my box, 10.0.0.5 is my IP address on my
 local network.

 en0: flags=8863UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST mtu 1500

 ether 34:36:3b:d2:b0:f4

 inet 10.0.0.5 netmask 0xff00 broadcast 10.0.0.255

 media: autoselect

  status: active


 I think perhaps there may be some configuration I am missing. Being able
 to run jobs locally (without HDFS or creating a cluster) is essential for
 development, and the examples come from the Spark 1.3.1 Quick Start page [
 https://spark.apache.org/docs/latest/quick-start.html], so this is
 probably something to do with my environment.


 Thanks in advance for any help you can provide.

 -sujit

 =

 Full output of SparkPi run (including stack trace) follows:

 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties

 15/05/23 08:08:55 INFO SparkContext: Running Spark version 1.3.1

 15/05/23 08:08:57 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable

 15/05/23 08:08:57 INFO SecurityManager: Changing view acls to: palsujit

 15/05/23 08:08:57 INFO SecurityManager: Changing modify acls to: palsujit

 15/05/23 08:08:57 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(palsujit);
 users with modify permissions: Set(palsujit)

 15/05/23 08:08:57 INFO Slf4jLogger: Slf4jLogger started

 15/05/23 08:08:57 INFO Remoting: Starting remoting

 15/05/23 08:08:58 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@10.0.0.5:52008]

 15/05/23 08:08:58 INFO Utils: Successfully started service 'sparkDriver'
 on port 52008.

 15/05/23 08:08:58 INFO SparkEnv: Registering MapOutputTracker

 15/05/23 08:08:58 INFO SparkEnv: Registering BlockManagerMaster

 15/05/23 08:08:58 INFO DiskBlockManager: Created local directory at
 /var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-d97baddf-1b6f-41db-92bb-f82ab5184cb7/blockmgr-4ef3a194-1929-4dd3-a0e5-215175d8e41a

 15/05/23 08:08:58 INFO MemoryStore: MemoryStore started with capacity
 265.1 MB

 15/05/23 08:08:58 INFO HttpFileServer: HTTP File server directory is
 /var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-fdf36480-def0-44b7-9942-098d9ef3e2b4/httpd-e494852a-7d61-4441-8b80-566d9f820afb

 15/05/23 08:08:58 INFO HttpServer: Starting HTTP Server

 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

 15/05/23 08:08:58 INFO AbstractConnector: Started
 SocketConnector@0.0.0.0:52009

 15/05/23 08:08:58 INFO Utils: Successfully started service 'HTTP file
 server' on port 52009.

 15/05/23 08:08:58 INFO SparkEnv: Registering OutputCommitCoordinator

 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT

 15/05/23 08:08:58 INFO AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040