Re: Lemmatization using StanfordNLP in ML 2.0
Hi Janardhan, You need the classifier "models" attribute on the second entry for stanford-corenlp to indicate that you want the models JAR, as shown below. Right now you are importing two instances of stanford-corenlp JARs. libraryDependencies ++= { val sparkVersion = "2.0.0" Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0", "com.google.protobuf" % "protobuf-java" % "2.6.1", "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models", "org.scalatest" %% "scalatest" % "2.2.6" % "test" ) } -sujit On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty <janardhan...@gmail.com> wrote: > Hi Sujit, > > Tried that option but same error: > > java version "1.8.0_51" > > > libraryDependencies ++= { > val sparkVersion = "2.0.0" > Seq( > "org.apache.spark" %% "spark-core" % sparkVersion % "provided", > "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", > "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", > "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", > "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0", > "com.google.protobuf" % "protobuf-java" % "2.6.1", > "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0", > "org.scalatest" %% "scalatest" % "2.2.6" % "test" > ) > } > > Error: > > Exception in thread "main" java.lang.NoClassDefFoundError: > edu/stanford/nlp/pipeline/StanfordCoreNLP > at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1. > apply(Lemmatizer.scala:37) > at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1. > apply(Lemmatizer.scala:33) > at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$ > 2.apply(ScalaUDF.scala:88) > at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$ > 2.apply(ScalaUDF.scala:87) > at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval( > ScalaUDF.scala:1060) > at org.apache.spark.sql.catalyst.expressions.Alias.eval( > namedExpressions.scala:142) > at org.apache.spark.sql.catalyst.expressions. > InterpretedProjection.apply(Projection.scala:45) > at org.apache.spark.sql.catalyst.expressions. > InterpretedProjection.apply(Projection.scala:29) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map( > TraversableLike.scala:234) > > > > On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal <sujitatgt...@gmail.com> wrote: > >> Hi Janardhan, >> >> Maybe try removing the string "test" from this line in your build.sbt? >> IIRC, this restricts the models JAR to be called from a test. >> >> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier >> "models", >> >> -sujit >> >> >> On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty < >> janardhan...@gmail.com> wrote: >> >>> Hi, >>> >>> I am trying to use lemmatization as a transformer and added belwo to the >>> build.sbt >>> >>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0", >>> "com.google.protobuf" % "protobuf-java" % "2.6.1", >>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" >>> classifier "models", >>> "org.scalatest" %% "scalatest" % "2.2.6" % "test" >>> >>> >>> Error: >>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>> edu/stanford/nlp/pipeline/StanfordCoreNLP* >>> >>> I have tried other versions of this spark package. >>> >>> Any help is appreciated.. >>> >> >> >
Re: Lemmatization using StanfordNLP in ML 2.0
Hi Janardhan, Maybe try removing the string "test" from this line in your build.sbt? IIRC, this restricts the models JAR to be called from a test. "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier "models", -sujit On Sun, Sep 18, 2016 at 11:01 AM, janardhan shettywrote: > Hi, > > I am trying to use lemmatization as a transformer and added belwo to the > build.sbt > > "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0", > "com.google.protobuf" % "protobuf-java" % "2.6.1", > "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier > "models", > "org.scalatest" %% "scalatest" % "2.2.6" % "test" > > > Error: > *Exception in thread "main" java.lang.NoClassDefFoundError: > edu/stanford/nlp/pipeline/StanfordCoreNLP* > > I have tried other versions of this spark package. > > Any help is appreciated.. >
Re: pyspark mappartions ()
I built this recently using the accepted answer on this SO page: http://stackoverflow.com/questions/26741714/how-does-the-pyspark-mappartitions-function-work/26745371 -sujit On Sat, May 14, 2016 at 7:00 AM, Mathieu Longtinwrote: > From memory: > def processor(iterator): > for item in iterator: > newitem = do_whatever(item) > yield newitem > > newdata = data.mapPartition(processor) > > Basically, your function takes an iterator as an argument, and must either > be an iterator or return one. > > On Sat, May 14, 2016 at 12:39 AM Abi wrote: > >> >> >> On Tue, May 10, 2016 at 2:20 PM, Abi wrote: >> >>> Is there any example of this ? I want to see how you write the the >>> iterable example >> >> >> -- > Mathieu Longtin > 1-514-803-8977 >
Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?
Hi Charles, I tried this with dummied out functions which just sum transformations of a list of integers, maybe they could be replaced by algorithms in your case. The idea is to call them through a "god" function that takes an additional type parameter and delegates out to the appropriate function. Here's my code, maybe it helps... def f0(xs): > return len(xs) > def f1(xs): > return sum(xs) > def f2(xs): > return sum([x**2 for x in xs]) > def f_god(n, xs): > if n == 1: > return f1(xs) > elif n == 2: > return f2(xs) > else: > return f0(xs) > > xs = [x for x in range(0, 5)] > xs_b = sc.broadcast(xs) > ns = sc.parallelize([x for x in range(0, 3)]) > results = ns.map(lambda n: f_god(n, xs_b.value)) > print results.take(10) gives me: [5, 10, 30] -sujit On Mon, Mar 28, 2016 at 12:59 AM, Holden Karauwrote: > You probably want to look at the map transformation, and the many more > defined on RDDs. The function you pass in to map is serialized and the > computation is distributed. > > > On Monday, March 28, 2016, charles li wrote: > >> >> use case: have a dataset, and want to use different algorithms on that, >> and fetch the result. >> >> for making this, I think I should distribute my algorithms, and run these >> algorithms on the dataset at the same time, am I right? >> >> but it seems that spark can not parallelize/serialize >> algorithms/functions, then how to make it? >> >> >> *here is the test code*: >> >> >> >> def test(): >> pass >> function_list = [test] * 10 >> >> sc.parallelize([test] * 10).take(1) >> >> >> >> >> *error message: * >> Py4JJavaError: An error occurred while calling >> z:org.apache.spark.api.python.PythonRDD.runJob. >> >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage >> 9.0 (TID 105, sh-demo-hadoop-07): >> org.apache.spark.api.python.PythonException: Traceback (most recent call >> last): >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", >> line 111, in main >> >> process() >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", >> line 106, in process >> >> serializer.dump_stream(func(split_index, iterator), outfile) >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 263, in dump_stream >> >> vs = list(itertools.islice(iterator, batch)) >> >> File >> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line >> 1293, in takeUpToNumLeft >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 139, in load_stream >> >> yield self._read_with_length(stream) >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 164, in _read_with_length >> >> return self.loads(obj) >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 422, in loads >> >> return pickle.loads(obj) >> >> AttributeError: 'module' object has no attribute 'test' >> >> >> at >> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) >> >> at >> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) >> >> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) >> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> what's interesting is that* when I run sc.parallelize([test] * >> 10).collect() , it works fine*, returns : >> >> [, >> >> , >> >> , >> >> , >> >> , >> >> , >> >> , >> >> , >> >> , >> >> ] >> >> >> >> >> -- >> -- >> a spark lover, a quant, a developer and a good man. >> >> http://github.com/litaotao >> > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > >
Re: How to create dataframe from SQL Server SQL query
Hi Ningjun, Haven't done this myself, saw your question and was curious about the answer and found this article which you might find useful: http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/ According this article, you can pass in your SQL statement in the "dbtable" mapping, ie, something like: val jdbcDF = sqlContext.read.format("jdbc") .options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "(select docid, title, docText from dbo.document where docid between 10 and 1000)" )).load -sujit On Mon, Dec 7, 2015 at 8:26 AM, Wang, Ningjun (LNG-NPV) < ningjun.w...@lexisnexis.com> wrote: > How can I create a RDD from a SQL query against SQLServer database? Here > is the example of dataframe > > > > http://spark.apache.org/docs/latest/sql-programming-guide.html#overview > > > > > > *val* jdbcDF *=* sqlContext.read.format("jdbc").options( > > *Map*("url" -> "jdbc:postgresql:dbserver", > > "dbtable" -> "schema.tablename")).load() > > > > This code create dataframe from a table. How can I create dataframe from a > query, e.g. “select docid, title, docText from dbo.document where docid > between 10 and 1000”? > > > > Ningjun > > >
Re: Please add us to the Powered by Spark page
Thank you Sean, much appreciated. And yes, perhaps "email dev" is a better option since the traffic is (probably) lighter and these sorts of requests are more likely to get noticed. Although one would need to subscribe to the dev list to do that... -sujit On Tue, Nov 24, 2015 at 1:16 AM, Sean Owen <so...@cloudera.com> wrote: > Not sure who generally handles that, but I just made the edit. > > On Mon, Nov 23, 2015 at 6:26 PM, Sujit Pal <sujitatgt...@gmail.com> wrote: > > Sorry to be a nag, I realize folks with edit rights on the Powered by > Spark > > page are very busy people, but its been 10 days since my original > request, > > was wondering if maybe it just fell through the cracks. If I should > submit > > via some other channel that will make sure it is looked at (or better > yet, a > > self service option), please let me know and I will do so. > > > > Here is the information again. > > > > Organization Name: Elsevier Labs > > URL: http://labs.elsevier.com > > Spark components: Spark Core, Spark SQL, MLLib, GraphX. > > Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content > as a > > Service, Content and Event Analytics, Content/Event based Predictive > Models > > and Big Data Processing. We use Scala and Python over Databricks > Notebooks > > for most of our work. > > > > Thanks very much, > > Sujit > > > > On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal <sujitatgt...@gmail.com> > wrote: > >> > >> Hello, > >> > >> We have been using Spark at Elsevier Labs for a while now. Would love to > >> be added to the “Powered By Spark” page. > >> > >> Organization Name: Elsevier Labs > >> URL: http://labs.elsevier.com > >> Spark components: Spark Core, Spark SQL, MLLib, GraphX. > >> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content > as > >> a Service, Content and Event Analytics, Content/Event based Predictive > >> Models and Big Data Processing. We use Scala and Python over Databricks > >> Notebooks for most of our work. > >> > >> Thanks very much, > >> > >> Sujit Pal > >> Technical Research Director > >> Elsevier Labs > >> sujit@elsevier.com > >> > >> > > >
Re: Please add us to the Powered by Spark page
Sorry to be a nag, I realize folks with edit rights on the Powered by Spark page are very busy people, but its been 10 days since my original request, was wondering if maybe it just fell through the cracks. If I should submit via some other channel that will make sure it is looked at (or better yet, a self service option), please let me know and I will do so. Here is the information again. Organization Name: Elsevier Labs URL: http://labs.elsevier.com Spark components: Spark Core, Spark SQL, MLLib, GraphX. Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a Service, Content and Event Analytics, Content/Event based Predictive Models and Big Data Processing. We use Scala and Python over Databricks Notebooks for most of our work. Thanks very much, Sujit On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal <sujitatgt...@gmail.com> wrote: > Hello, > > We have been using Spark at Elsevier Labs for a while now. Would love to > be added to the “Powered By Spark” page. > > Organization Name: Elsevier Labs > URL: http://labs.elsevier.com > Spark components: Spark Core, Spark SQL, MLLib, GraphX. > Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as > a Service, Content and Event Analytics, Content/Event based Predictive > Models and Big Data Processing. We use Scala and Python over Databricks > Notebooks for most of our work. > > Thanks very much, > > Sujit Pal > Technical Research Director > Elsevier Labs > sujit@elsevier.com > > >
Please add us to the Powered by Spark page
Hello, We have been using Spark at Elsevier Labs for a while now. Would love to be added to the “Powered By Spark” page. Organization Name: Elsevier Labs URL: http://labs.elsevier.com Spark components: Spark Core, Spark SQL, MLLib, GraphX. Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a Service, Content and Event Analytics, Content/Event based Predictive Models and Big Data Processing. We use Scala and Python over Databricks Notebooks for most of our work. Thanks very much, Sujit Pal Technical Research Director Elsevier Labs sujit@elsevier.com
Re: Prevent possible out of memory when using read/union
Hi Alexander, You may want to try the wholeTextFiles() method of SparkContext. Using that you could just do something like this: sc.wholeTextFiles("hdfs://input_dir") > .saveAsSequenceFile("hdfs://output_dir") The wholeTextFiles returns a RDD of ((filename, content)). http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext You will not have to worry about managing memory as much with this approach. -sujit On Wed, Nov 4, 2015 at 2:12 AM, Alexander Lenzwrote: > Hi colleagues, > > In Hadoop I have a lot of folders containing small files. Therefore I am > reading the content of all folders, union the small files and write the > unioned data into a single folder > containing one file. Afterwards I delete the small files and the according > folders. > > I see two possible emerging problems on which I would like to get your > opinion: > > 1. When reading all the files inside the folders into the master > program, I think it might appear, that there is such an amount of files > that the master program will run out of memory? > To prevent this I thought about checking the file size of the folders and > only read folders in as long as there is enough memory to handle the amount. > Do you think that this is a possible solution or is there a better > solution to handle this problem? > > 2. The other problem is: I am doing a UnionAll to merge all the > content of the files. In my opinion this will cause that the data needs to > be brought to a single master and then the data will be unioned there. > So there might be the same problem, that the application runs out of > memory. > My proposed solution would also be to union only if the size does not > exceed the available memory. Any better solution? > > For a better understanding you can have a look at my code at the bottom of > the mail. > Would be glad to hear from your experience as I would assume that this > problem should be a general one. > > Thanks & Best, Alex > > > > > val sqlContext = new SQLContext(sc) > > //get filesystem > val conf = new Configuration() > val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), > conf) > > //get relevant folders > val directoryStatus = fs.listStatus(new Path("hdfs:// > sandbox.hortonworks.com/demo/parquet/staging/")) > val latestFolder = directoryStatus.maxBy(x => x.getModificationTime) > > val toWorkFolders = directoryStatus.filter(x => x.getModificationTime > < latestFolder.getModificationTime) > > //aggregate folder content > val parquetFiles = toWorkFolders.map(folder => { > sqlContext.read.parquet(folder.getPath.toString) > }) > > val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y)) > > mergedParquet.coalesce(1) //Assemble part-files into one partition > ..write.mode(SaveMode.Append) > ..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*) > ..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/ > ") > >
Re: How to close connection in mapPartitions?
Hi Bin, Very likely the RedisClientPool is being closed too quickly before map has a chance to get to it. One way to verify would be to comment out the .close line and see what happens. FWIW I saw a similar problem writing to Solr where I put a commit where you have a close, and noticed that the commit was happening before the actual data insertion (in the .map line) happened (and no data showing up in the index until the next time I ran the code :-)). At the time I got around it by doing a zipWithIndex on the Iterator, then doing a partial commit every n records, and finally doing a commit from the driver code. However, this won't work for you, and there is a better way outlined on this page (look for Tobias Pfeiffer, its the code block immediately following): http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ where you test for hasNext on the iterator and call close if its the last element, within the scope of the .map call. -sujit On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > Are you sure RedisClientPool is being initialized properly in the > constructor of RedisCache? Can you please copy paste the code that you use > to initialize RedisClientPool inside the constructor of RedisCache? > > Thanks, > Aniket > > On Fri, Oct 23, 2015 at 11:47 AM Bin Wangwrote: > >> BTW, "lines" is a DStream. >> >> Bin Wang 于2015年10月23日周五 下午2:16写道: >> >>> I use mapPartitions to open connections to Redis, I write it like this: >>> >>> val seqs = lines.mapPartitions { lines => >>> val cache = new RedisCache(redisUrl, redisPort) >>> val result = lines.map(line => Parser.parseBody(line, cache)) >>> cache.redisPool.close >>> result >>> } >>> >>> But it seems the pool is closed before I use it. Am I doing anything >>> wrong? Here is the error: >>> >>> java.lang.IllegalStateException: Pool not open >>> at >>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) >>> at >>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) >>> at com.redis.RedisClientPool.withClient(Pool.scala:34) >>> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) >>> at >>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) >>> at >>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >>> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) >>> at >>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >>> at >>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>>
Re: How to get inverse Matrix / RDD or how to solve linear system of equations
Hi Zhiliang, For a system of equations AX = y, Linear Regression will give you a best-fit estimate for A (coefficient vector) for a matrix of feature variables X and corresponding target variable y for a subset of your data. OTOH, what you are looking for here is to solve for x a system of equations Ax = b, where A and b are known and you want the vector x. This Math Stackexchange page [2] explains the math in more detail, but basically... A * x = b can be rewritten as x = A.I * b. You can get the pseudo-inverse of A using SVD (Spark MLLib supports SVD [1]). So the SVD decomposition would make A a product of three other matrices. A = U * S * V.T and the pseudo-inverse can be written as: A.I = V * S * U.T Then x can be found by multiplying A.I with b. -sujit [1] https://spark.apache.org/docs/1.2.0/mllib-dimensionality-reduction.html [2] http://math.stackexchange.com/questions/458404/how-can-we-compute-pseudoinverse-for-any-matrix On Fri, Oct 23, 2015 at 2:19 AM, Zhiliang Zhuwrote: > Hi Sujit, and All, > > Currently I lost in large difficulty, I am eager to get some help from you. > > There is some big linear system of equations as: > Ax = b, A with N number of row and N number of column, N is very large, b > = [0, 0, ..., 0, 1]T > Then, I will sovle it to get x = [x1, x2, ..., xn]T. > > The simple solution would be to get inverse(A), and then x = (inverse(A)) > * b . > A would be some JavaRDD , however, for RDD/matrix there > is add/multply/transpose APIs, no inverse API for it! > > Then, how would it conveniently get inverse(A), or just solve the linear > system of equations by some other way... > In Spark MLlib, there was linear regression, the training process might be > to solve the coefficients to get some specific linear model, just is, > Ax = y, just train by (x, y) to get A , this might be used to solve the > linear system of equations. It is like that? I could not decide. > > I must show my deep appreciation torwards your all help. > > Thank you very much! > Zhiliang > > >
Re: Save RandomForest Model from ML package
Hi Sebastian, You can save models to disk and load them back up. In the snippet below (copied out of a working Databricks notebook), I train a model, then save it to disk, then retrieve it back into model2 from disk. import org.apache.spark.mllib.tree.RandomForest > import org.apache.spark.mllib.tree.model.RandomForestModel > val model = RandomForest.trainClassifier(data, numClasses, > categoricalFeaturesInfo, > numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed) > model.save(sc, inputDir + "models/randomForestModel") > val model2 = RandomForestModel.load(sc, inputDir + > "models/randomForestModel") Not sure if there is PMML support. The model saves itself into a directory structure that looks like this: data/ > _SUCCESS > _common_metadata > _metadata > part-r-*.gz.parquet (multiple files) > metadata/ > _SUCCESS > part-0 HTH -sujit On Thu, Oct 22, 2015 at 5:33 AM, Sebastian Kuepers < sebastian.kuep...@publicispixelpark.de> wrote: > Hey, > > I try to figure out the best practice on saving and loading models which > have bin fitted with the ML package - i.e. with the RandomForest > classifier. > > There is PMML support in the MLib package afaik but not in ML - is that > correct? > > How do you approach this, so that you do not have to fit your model before > every prediction job? > > Thanks, > Sebastian > > > Sebastian Küpers > Account Director > > Publicis Pixelpark > Leibnizstrasse 65, 10629 Berlin > T +49 30 5058 1838 > M +49 172 389 28 52 > sebastian.kuep...@publicispixelpark.de > Web: publicispixelpark.de, Twitter: @pubpxp > Facebook: publicispixelpark.de/facebook > Publicis Pixelpark - eine Marke der Pixelpark AG > Vorstand: Horst Wagner (Vorsitzender), Dirk Kedrowitsch > Aufsichtsratsvorsitzender: Pedro Simko > Amtsgericht Charlottenburg: HRB 72163 > > > > > > > Disclaimer The information in this email and any attachments may contain > proprietary and confidential information that is intended for the > addressee(s) only. If you are not the intended recipient, you are hereby > notified that any disclosure, copying, distribution, retention or use of > the contents of this information is prohibited. When addressed to our > clients or vendors, any information contained in this e-mail or any > attachments is subject to the terms and conditions in any governing > contract. If you have received this e-mail in error, please immediately > contact the sender and delete the e-mail. >
Re: How to subtract two RDDs with same size
Hi Zhiliang, How about doing something like this? val rdd3 = rdd1.zip(rdd2).map(p => p._1.zip(p._2).map(z => z._1 - z._2)) The first zip will join the two RDDs and produce an RDD of (Array[Float], Array[Float]) pairs. On each pair, we zip the two Array[Float] components together to form an Array[(Float, Float)] and then we subtract the first element from the second in the inner map (the inner map is a Scala map not a Spark one). I tried this out on a notebook: val rdd1 = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0), Array(7.0, 8.0, 9.0))) val rdd2 = sc.parallelize(List(Array(1.0, 4.0, 3.0), Array(4.0, 10.0, 6.0), Array(7.0, 16.0, 9.0))) val rdd3 = rdd1.zip(rdd2).map(p => p._1.zip(p._2).map(z => z._1 - z._2)) rdd3.collect() gives me: res0: Array[Array[Double]] = Array(Array(0.0, -2.0, 0.0), Array(0.0, -5.0, 0.0), Array(0.0, -8.0, 0.0)) -sujit On Wed, Sep 23, 2015 at 12:23 AM, Zhiliang Zhuwrote: > there is matrix add API, might map rdd2 each row element to be negative , > then make rdd1 and rdd2 and call add ? > > Or some more ways ... > > > > On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu < > zchl.j...@yahoo.com> wrote: > > > Hi All, > > There are two RDDs : RDD rdd1, and RDD rdd2, > that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with > same row number and column number. > > I would like to get RDD rdd3, each element in rdd3 is the > subtract between rdd1 and rdd2 of the > same position, which is similar Matrix subtract: > rdd3 = rdd1 - rdd2 ... > > It seemed very difficult to operate this kinds of matrix arithmetic, even > is about add, subtract, multiple , diff etc... > > I shall appreciate your help very much~~ > Zhiliang > > > > >
Re: Calling a method parallel
Hi Tapan, Perhaps this may work? It takes a range of 0..100 and creates an RDD out of them, then calls X(i) on each. The X(i) should be executed on the workers in parallel. Scala: val results = sc.parallelize(0 until 100).map(idx => X(idx)) Python: results = sc.parallelize(range(100)).map(lambda idx: X(idx)) -sujit On Wed, Sep 23, 2015 at 6:46 AM, Tapan Sharmawrote: > Hi All, > > I want to call a method X(int i) from my Spark program for different values > of i. > This means. > X(1), X(2).. X(n).. > Each time it returns the one object. > Currently I am doing this sequentially. > > Is there any way to run these in parallel and I get back the list of > objects? > Sorry for this basic question. > > Regards > Tapan > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Calling-a-method-parallel-tp24786.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: How to get a new RDD by ordinarily subtract its adjacent rows
Hi Zhiliang, Would something like this work? val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0)) -sujit On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhuwrote: > Hi Romi, > > Thanks very much for your kind help comment~~ > > In fact there is some valid backgroud of the application, it is about R > data analysis. > ... > #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each > daily fund return, row is the daily date > #fund_return_daily needs to count the each fund's daily return subtracted > the previous day's return > fund_return_daily <- diff(log(fund_nav_daily)) > > #the first row would be all 0, since there is no previous row ahead first > row > fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), > fund_return_daily) > ... > > I need to exactly code the R program by way of spark, then RDD/DataFrame > is used to replace R data.frame, > however, I just found that it is VERY MUCH diffcult to make the spark > program to flexibly descript & transform R backgroud applications. > I think I have seriously lost myself into risk about this... > > Would you help direct me some about the above coding issue... and my risk > about practice in spark/R application... > > I must show all my sincere thanks torwards your kind help. > > P.S. currently sparkR in spark 1.4.1 , there is many bug in the API > createDataFrame/except/unionAll, and it seems > that spark Java has more functions than sparkR. > Also, no specific R regression algorithmn is including in sparkR . > > Best Regards, > Zhiliang > > > On Monday, September 21, 2015 7:36 PM, Romi Kuntsman > wrote: > > > RDD is a set of data rows (in your case numbers), there is no meaning for > the order of the items. > What exactly are you trying to accomplish? > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu > wrote: > > Dear , > > I have took lots of days to think into this issue, however, without any > success... > I shall appreciate your all kind help. > > There is an RDD rdd1, I would like get a new RDD rdd2, each row > in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] . > What kinds of API or function would I use... > > > Thanks very much! > John > > > > >
Re: How to get a new RDD by ordinarily subtract its adjacent rows
Hi Zhiliang, Haven't used the Java API but found this Javadoc page, may be helpful to you. https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html I think the equivalent Java code snippet might go something like this: RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2) (the second parameter of fromRDD comes from this discussion thread). http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html There is also the SlidingRDD decorator: https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html So maybe something like this: new SlidingRDD(rdd1, 2, ClassTag$.apply(Class)) -sujit On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu <zchl.j...@yahoo.com> wrote: > Hi Sujit, > > I must appreciate your kind help very much~ > > It seems to be OK, however, do you know the corresponding spark Java API > achievement... > Is there any java API as scala sliding, and it seemed that I do not find > spark scala's doc about sliding ... > > Thank you very much~ > Zhiliang > > > > On Monday, September 21, 2015 11:48 PM, Sujit Pal <sujitatgt...@gmail.com> > wrote: > > > Hi Zhiliang, > > Would something like this work? > > val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0)) > > -sujit > > > On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid > > wrote: > > Hi Romi, > > Thanks very much for your kind help comment~~ > > In fact there is some valid backgroud of the application, it is about R > data analysis. > ... > #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each > daily fund return, row is the daily date > #fund_return_daily needs to count the each fund's daily return subtracted > the previous day's return > fund_return_daily <- diff(log(fund_nav_daily)) > > #the first row would be all 0, since there is no previous row ahead first > row > fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), > fund_return_daily) > ... > > I need to exactly code the R program by way of spark, then RDD/DataFrame > is used to replace R data.frame, > however, I just found that it is VERY MUCH diffcult to make the spark > program to flexibly descript & transform R backgroud applications. > I think I have seriously lost myself into risk about this... > > Would you help direct me some about the above coding issue... and my risk > about practice in spark/R application... > > I must show all my sincere thanks torwards your kind help. > > P.S. currently sparkR in spark 1.4.1 , there is many bug in the API > createDataFrame/except/unionAll, and it seems > that spark Java has more functions than sparkR. > Also, no specific R regression algorithmn is including in sparkR . > > Best Regards, > Zhiliang > > > On Monday, September 21, 2015 7:36 PM, Romi Kuntsman <r...@totango.com> > wrote: > > > RDD is a set of data rows (in your case numbers), there is no meaning for > the order of the items. > What exactly are you trying to accomplish? > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid > > wrote: > > Dear , > > I have took lots of days to think into this issue, however, without any > success... > I shall appreciate your all kind help. > > There is an RDD rdd1, I would like get a new RDD rdd2, each row > in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] . > What kinds of API or function would I use... > > > Thanks very much! > John > > > > > > > >
Re: Scala: How to match a java object????
Hi Saif, Would this work? import scala.collection.JavaConversions._ new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = x.doubleValue } It gives me on the scala console. res9: Double = 5.0 Assuming you had a stream of BigDecimals, you could just call map on it. myBigDecimals.map(_.doubleValue) to get your Seq of Doubles. You will need the JavaConversions._ import to allow Java Doubles to be treated by Scala as Scala Doubles. -sujit On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote: Hi, thank you for further assistance you can reproduce this by simply running *5 match { case java.math.BigDecimal = 2 }* In my personal case, I am applying a map acton to a Seq[Any], so the elements inside are of type any, to which I need to apply a proper .asInstanceOf[WhoYouShouldBe]. Saif *From:* William Briggs [mailto:wrbri...@gmail.com] *Sent:* Tuesday, August 18, 2015 4:46 PM *To:* Ellafi, Saif A.; user@spark.apache.org *Subject:* Re: Scala: How to match a java object Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive *java.math.BigDecimal *objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: *“error: object java.math.BigDecimal is not a value”* How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
Re: How to increase parallelism of a Spark cluster?
@Silvio: the mapPartitions instantiates a HttpSolrServer, then for each query string in the partition, sends the query to Solr using SolrJ, and gets back the top N results. It then reformats the result data into one long string and returns the key value pair as (query string, result string). @Igor: Thanks for the parameter suggestions. I will check the --num-executors and if there is a way to set the number of cores/executor with my Databricks admin and update here if I find it, but from the Databricks console, it appears that the number of executors per box is 1. This seems normal though, per the diagram on this page: http://spark.apache.org/docs/latest/cluster-overview.html where it seems that there is 1 executor per box, and each executor can spawn multiple threads to take care of multiple tasks (see bullet #1 copied below). Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). Regarding hitting the max number of requests, thanks for the link. I am using the default client. Just peeked at the Solr code, and the default settings (if no HttpClient instance is supplied in the ctor) is to use DefaultHttpClient (from HttpComponents) whose settings are as follows: - Version: HttpVersion.HTTP_1_1 - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET - NoTcpDelay: true - SocketBufferSize: 8192 - UserAgent: Apache-HttpClient/release (java 1.5) In addition, the Solr code sets the following additional config parameters on the DefaultHttpClient. params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); Since all my connections are coming out of 2 worker boxes, it looks like I could get 32x2 = 64 clients hitting Solr, right? @Steve: Thanks for the link to the HttpClient config. I was thinking about using a thread pool (or better using a PoolingHttpClientManager per the docs), but it probably won't help since its still being fed one request at a time. @Abhishek: my observations agree with what you said. In the past I have had success with repartition to reduce the partition size especially when groupBy operations were involved. But I believe an executor should be able to handle multiple tasks in parallel from what I understand about Akka on which Spark is built - the worker is essentially an ActorSystem which can contain multiple Actors, each actor works on a queue of tasks. Within an Actor everything is sequential, but the ActorSystem is responsible for farming out tasks it gets to each of its Actors. Although it is possible I could be generalizing incorrectly from my limited experience with Akka. Thanks again for all your help. Please let me know if something jumps out and/or if there is some configuration I should check. -sujit On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: I don't know if (your assertion/expectation that) workers will process things (multiple partitions) in parallel is really valid. Or if having more partitions than workers will necessarily help (unless you are memory bound - so partitions is essentially helping your work size rather than execution parallelism). [Disclaimer: I am no authority on Spark, but wanted to throw my spin based my own understanding]. Nothing official about it :) -abhishek- On Jul 31, 2015, at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote: Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me
Re: How to increase parallelism of a Spark cluster?
No one has any ideas? Is there some more information I should provide? I am looking for ways to increase the parallelism among workers. Currently I just see number of simultaneous connections to Solr equal to the number of workers. My number of partitions is (2.5x) larger than number of workers, and the workers seem to be large enough to handle more than one task at a time. I am creating a single client per partition in my mapPartition call. Not sure if that is creating the gating situation? Perhaps I should use a Pool of clients instead? Would really appreciate some pointers. Thanks in advance for any help you can provide. -sujit On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote: Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me to believe that each worker processes a single stream of work sequentially. However, from what I understand about how Spark works, each worker should be able to process number of tasks parallelly, and that repartition() is a hint for it to do so. Is there some SparkConf environment variable I should set to increase parallelism in these workers, or should I just configure a cluster with multiple workers per machine? Or is there something I am doing wrong? Thank you in advance for any pointers you can provide. -sujit
Re: How to increase parallelism of a Spark cluster?
Hi Igor, The cluster is a Databricks Spark cluster. It consists of 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The original mail has some more details (also the reference to the HttpSolrClient in there should be HttpSolrServer, sorry about that, mistake while writing the email). There is no additional configuration on the external Solr host from my code, I am using the default HttpClient provided by HttpSolrServer. According to the Javadocs, you can pass in a HttpClient object as well. Is there some specific configuration you would suggest to get past any limits? On another project, I faced a similar problem but I had more leeway (was using a Spark cluster from EC2) and less time, my workaround was to use python multiprocessing to create a program that started up 30 python JSON/HTTP clients and wrote output into 30 output files, which were then processed by Spark. Reason I mention this is that I was using default configurations there as well, just needed to increase the number of connections against Solr to a higher number. This time round, I would like to do this through Spark because it makes the pipeline less complex. -sujit On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman igor.ber...@gmail.com wrote: What kind of cluster? How many cores on each worker? Is there config for http solr client? I remember standard httpclient has limit per route/host. On Aug 2, 2015 8:17 PM, Sujit Pal sujitatgt...@gmail.com wrote: No one has any ideas? Is there some more information I should provide? I am looking for ways to increase the parallelism among workers. Currently I just see number of simultaneous connections to Solr equal to the number of workers. My number of partitions is (2.5x) larger than number of workers, and the workers seem to be large enough to handle more than one task at a time. I am creating a single client per partition in my mapPartition call. Not sure if that is creating the gating situation? Perhaps I should use a Pool of clients instead? Would really appreciate some pointers. Thanks in advance for any help you can provide. -sujit On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote: Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me to believe that each worker processes a single stream of work sequentially. However, from what I understand about how Spark works, each worker should be able to process number of tasks parallelly, and that repartition() is a hint for it to do so. Is there some SparkConf environment variable I should set to increase parallelism in these workers, or should I just configure a cluster with multiple workers per machine? Or is there something I am doing wrong? Thank you in advance for any pointers you can provide. -sujit
How to increase parallelism of a Spark cluster?
Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me to believe that each worker processes a single stream of work sequentially. However, from what I understand about how Spark works, each worker should be able to process number of tasks parallelly, and that repartition() is a hint for it to do so. Is there some SparkConf environment variable I should set to increase parallelism in these workers, or should I just configure a cluster with multiple workers per machine? Or is there something I am doing wrong? Thank you in advance for any pointers you can provide. -sujit
Re: use S3-Compatible Storage with spark
Hi Schmirr, The part after the s3n:// is your bucket name and folder name, ie s3n://${bucket_name}/${folder_name}[/${subfolder_name}]*. Bucket names are unique across S3, so the resulting path is also unique. There is no concept of hostname in s3 urls as far as I know. -sujit On Fri, Jul 17, 2015 at 1:36 AM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on EMR with S3 example (Python)
Hi Roberto, I think you would need to as Akhil said. Just checked from this page: http://aws.amazon.com/public-data-sets/ and clicking through to a few dataset links, all of them are available on s3 (some are available via http and ftp, but I think the point of these datasets are that they are usually very large so having it on s3 ensures that its easier to take your code to it than bring the datasets to your code. -sujit On Tue, Jul 14, 2015 at 1:56 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Hi Sujit, I just wanted to access public datasets on Amazon. Do I still need to provide the keys? Thank you, *From:* Sujit Pal [mailto:sujitatgt...@gmail.com] *Sent:* Tuesday, July 14, 2015 3:14 PM *To:* Pagliari, Roberto *Cc:* user@spark.apache.org *Subject:* Re: Spark on EMR with S3 example (Python) Hi Roberto, I have written PySpark code that reads from private S3 buckets, it should be similar for public S3 buckets as well. You need to set the AWS access and secret keys into the SparkContext, then you can access the S3 folders and files with their s3n:// paths. Something like this: sc = SparkContext() sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, aws_access_key) sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey, aws_secret_key) mydata = sc.textFile(s3n://mybucket/my_input_folder) \ .map(lambda x: do_something(x)) \ .saveAsTextFile(s3://mybucket/my_output_folder) ... You can read and write sequence files as well - these are the only 2 formats I have tried, but I'm sure the other ones like JSON would work also. Another approach is to embed the AWS access key and secret key into the s3n:// path. I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its an older version but not sure) but it works for access. Hope this helps, Sujit On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Is there an example about how to load data from a public S3 bucket in Python? I haven’t found any. Thank you,
Re: Efficiency of leftOuterJoin a cassandra rdd
Hi Wush, One option may be to try a replicated join. Since your rdd1 is small, read it into a collection and broadcast it to the workers, then filter your larger rdd2 against the collection on the workers. -sujit On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain deepuj...@gmail.com wrote: Leftouterjoin and join apis are super slow in spark. 100x slower than hadoop Sent from my iPhone On 14-Jul-2015, at 10:59 PM, Wush Wu wush...@gmail.com wrote: I don't understand. By the way, the `joinWithCassandraTable` does improve my query time from 40 mins to 3 mins. 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: I have explored spark joins for last few months (you can search my posts) and its frustrating useless. On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu wush...@gmail.com wrote: Dear all, I have found a post discussing the same thing: https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J The solution is using joinWithCassandraTable and the documentation is here: https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md Wush 2015-07-15 12:15 GMT+08:00 Wush Wu wush...@gmail.com: Dear all, I am trying to join two RDDs, named rdd1 and rdd2. rdd1 is loaded from a textfile with about 33000 records. rdd2 is loaded from a table in cassandra which has about 3 billions records. I tried the following code: ```scala val rdd1 : (String, XXX) = sc.textFile(...).map(...) import org.apache.spark.sql.cassandra.CassandraSQLContext cc.setKeyspace(xxx) val rdd2 : (String, String) = cc.sql(SELECT x, y FROM xxx).map(r = ...) val result = rdd1.leftOuterJoin(rdd2) result.take(20) ``` However, the log shows that the spark loaded 3 billions records from cassandra and only 33000 records left at the end. Is there a way to query the cassandra based on the key in rdd1? Here is some information of our system: - The spark version is 1.3.1 - The cassandra version is 2.0.14 - The key of joining is the primary key of the cassandra table. Best, Wush - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on EMR with S3 example (Python)
Hi Roberto, I have written PySpark code that reads from private S3 buckets, it should be similar for public S3 buckets as well. You need to set the AWS access and secret keys into the SparkContext, then you can access the S3 folders and files with their s3n:// paths. Something like this: sc = SparkContext() sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, aws_access_key) sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey, aws_secret_key) mydata = sc.textFile(s3n://mybucket/my_input_folder) \ .map(lambda x: do_something(x)) \ .saveAsTextFile(s3://mybucket/my_output_folder) ... You can read and write sequence files as well - these are the only 2 formats I have tried, but I'm sure the other ones like JSON would work also. Another approach is to embed the AWS access key and secret key into the s3n:// path. I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its an older version but not sure) but it works for access. Hope this helps, Sujit On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Is there an example about how to load data from a public S3 bucket in Python? I haven’t found any. Thank you,
Re: PySpark without PySpark
Hi Ashish, Cool. glad it worked out. I have only used Spark clusters on EC2, which I spin up using the spark-ec2 scripts (part of the Spark downloads). So don't have any experience setting up inhouse clusters like you want to do. But I found some documentation here that may be helpful. https://docs.sigmoidanalytics.com/index.php/Installing_Spark_and_Setting_Up_Your_Cluster#Deploying_set_of_machines_over_SSH There are other options as well in this document that will require you to know some other tools like Chef (previous sections). Good luck, Sujit On Thu, Jul 9, 2015 at 10:25 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Thank you for your time to help me out. And special thank you for your elaborate steps. I corrected SPARK_HOME to be c:\spark-1.3.0 2) I installed py4j from anaconda command prompt and the command you gave executed successfully. 3) I replaced python27 as python in the 00-setup script. I now give the Path variables as defined and the PATH. SPARK_HOMEC:\Spark-1.3.0 JAVA_HOME C:\Program Files\Java\jdk1.7.0_79 PYTHONPATH C:\Users\Ashish Dutt\Anaconda MAVEN_HOMEC:\Maven\bin SBT_HOME C:\SBT PATH %JAVA_HOME%\BIN; %PYTHON_PATH%; %HADOOP_HOME%\BIN; %SPARK_HOME%; %M2_HOME%\BIN %MAVEN_HOME%\BIN;%SBT_HOME%; 4) This time, I grabbed my baseball bat (you do know why..) invoked ipython notebook again and with the other free hand I slowly typed the command print SPARK_HOME -- it worked Then another command from pyspark import SparkContext and it worked too!!! The baseball bat dropped to the ground and I quickly jabbed the other commands given in the post. Attached is the screenshot and it all worked... EUREKA... Sujit, a quintal of thanks for your persistence in helping me resolve this problem. You have been very helpful and I wish you luck and success in all your endeavors. Next milestone is to get this to work in a cluster environment. I am confused that do I need to install spark-1.3.0 on all the 4 linux machines that make my cluster? The goal is to use my laptop as a client (from where I will submit spark commands to the master server) The master can then distribute the job to the three nodes and provide the client with the end result. Am i correct in this visualization ? Once again, thank you for your efforts. Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Fri, Jul 10, 2015 at 11:48 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Julian's approach is probably better, but few observations: 1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin). 2) If you have anaconda python installed (I saw that you had set this up in a separate thread, py4j should be part of the package - at least I think so. To test this, try in your python repl: from py4j.java_gateway import JavaGateway if it succeeds you already have it. 3) In case Py4J is not installed, the best way to install a new package is using easy_install or pip. Make sure your path is set up so when you call python you are calling the anaconda version (in case you have multiple python versions), then if so, do easy_install py4j - this will install py4j correctly without any messing around on your part. Install instructions for py4j available on their site: http://py4j.sourceforge.net/install.html 4) You should replace the python2 in your 00-setup-script with python, so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python). -sujit On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Sujit, Many thanks for your response. To answer your questions; Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is SPARK_HOME=C:/spark-1.3.0/bin Q2) Is there a python2 or python subdirectory under the root of your Spark installation? - Yes, i do have that too. It is called python. To fix this problem this is what I did, I downloaded py4j-0.8.2.1-src from here https://pypi.python.org/pypi/py4j which was not there initially when I downloaded the spark package from the official repository. I then put it in the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the zip file. I put it in as it is. The pyspark folder of the spark-1.3.0 root folder. What I next did was copy this file and put it in the pythonpath. So my python path now reads as PYTHONPATH=C:/Python27/ I then rebooted the computer and a silent prayer :-) Then I opened the command prompt and invoked the command pyspark from the bin directory of spark and EUREKA, it worked :-) Attached is the screenshot for the same. Now, the problem is with IPython notebook. I cannot get it to work with pySpark. I have a cluster with 4 nodes using CDH5.4 I was able to resolve the problem. Now the next challenge was to configure it with IPython. Followed the steps as documented
Re: PySpark without PySpark
Hi Ashish, Your 00-pyspark-setup file looks very different from mine (and from the one described in the blog post). Questions: 1) Do you have SPARK_HOME set up in your environment? Because if not, it sets it to None in your code. You should provide the path to your Spark installation. In my case I have spark-1.3.1 installed under $HOME/Software and the code block under # Configure the environment (or yellow highlight in the code below) reflects that. 2) Is there a python2 or python subdirectory under the root of your Spark installation? In my case its python not python2. This contains the Python bindings for spark, so the block under # Add the PySpark/py4j to the Python path (or green highlight in the code below) adds it to the Python sys.path so things like pyspark.SparkContext are accessible in your Python environment. import os import sys # Configure the environment if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1 # Create a variable for our root path SPARK_HOME = os.environ['SPARK_HOME'] # Add the PySpark/py4j to the Python Path sys.path.insert(0, os.path.join(SPARK_HOME, python, build)) sys.path.insert(0, os.path.join(SPARK_HOME, python)) Hope this fixes things for you. -sujit On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Thanks for your response. So i opened a new notebook using the command ipython notebook --profile spark and tried the sequence of commands. i am getting errors. Attached is the screenshot of the same. Also I am attaching the 00-pyspark-setup.py for your reference. Looks like, I have written something wrong here. Cannot seem to figure out, what is it? Thank you for your help Sincerely, Ashish Dutt On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Nice post. Agreed, kudos to the author of the post, Benjamin Benfort of District Labs. Following your post, I get this problem; Again, not my post. I did try setting up IPython with the Spark profile for the edX Intro to Spark course (because I didn't want to use the Vagrant container) and it worked flawlessly with the instructions provided (on OSX). I haven't used the IPython/PySpark environment beyond very basic tasks since then though, because my employer has a Databricks license which we were already using for other stuff and we ended up doing the labs on Databricks. Looking at your screenshot though, I don't see why you think its picking up the default profile. One simple way of checking to see if things are working is to open a new notebook and try this sequence of commands: from pyspark import SparkContext sc = SparkContext(local, pyspark) sc You should see something like this after a little while: pyspark.context.SparkContext at 0x1093c9b10 While the context is being instantiated, you should also see lots of log lines scroll by on the terminal where you started the ipython notebook --profile spark command - these log lines are from Spark. Hope this helps, Sujit On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Nice post.. Exactly what I had been looking for. I am relatively a beginner with Spark and real time data processing. We have a server with CDH5.4 with 4 nodes. The spark version in our server is 1.3.0 On my laptop I have spark 1.3.0 too and its using Windows 7 environment. As per point 5 of your post I am able to invoke pyspark locally as in a standalone mode. Following your post, I get this problem; 1. In section Using Ipython notebook with spark I cannot understand why it is picking up the default profile and not the pyspark profile. I am sure it is because of the path variables. Attached is the screenshot. Can you suggest how to solve this. Current the path variables for my laptop are like SPARK_HOME=C:\SPARK-1.3.0\BIN, JAVA_HOME=C:\PROGRAM FILES\JAVA\JDK1.7.0_79, HADOOP_HOME=D:\WINUTILS, M2_HOME=D:\MAVEN\BIN, MAVEN_HOME=D:\MAVEN\BIN, PYTHON_HOME=C:\PYTHON27\, SBT_HOME=C:\SBT\ Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal sujitatgt...@gmail.com wrote: You are welcome Davies. Just to clarify, I didn't write the post (not sure if my earlier post gave that impression, apologize if so), although I agree its great :-). -sujit On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com wrote: Great post, thanks for sharing with us! On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Julian, I recently built a Python+Spark application to do search relevance analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on EC2 (so I don't use the PySpark shell, hopefully thats what you are looking for). Can't share the code, but the basic approach is covered in this blog post
Re: PySpark without PySpark
Hi Ashish, Julian's approach is probably better, but few observations: 1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin). 2) If you have anaconda python installed (I saw that you had set this up in a separate thread, py4j should be part of the package - at least I think so. To test this, try in your python repl: from py4j.java_gateway import JavaGateway if it succeeds you already have it. 3) In case Py4J is not installed, the best way to install a new package is using easy_install or pip. Make sure your path is set up so when you call python you are calling the anaconda version (in case you have multiple python versions), then if so, do easy_install py4j - this will install py4j correctly without any messing around on your part. Install instructions for py4j available on their site: http://py4j.sourceforge.net/install.html 4) You should replace the python2 in your 00-setup-script with python, so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python). -sujit On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Sujit, Many thanks for your response. To answer your questions; Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is SPARK_HOME=C:/spark-1.3.0/bin Q2) Is there a python2 or python subdirectory under the root of your Spark installation? - Yes, i do have that too. It is called python. To fix this problem this is what I did, I downloaded py4j-0.8.2.1-src from here https://pypi.python.org/pypi/py4j which was not there initially when I downloaded the spark package from the official repository. I then put it in the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the zip file. I put it in as it is. The pyspark folder of the spark-1.3.0 root folder. What I next did was copy this file and put it in the pythonpath. So my python path now reads as PYTHONPATH=C:/Python27/ I then rebooted the computer and a silent prayer :-) Then I opened the command prompt and invoked the command pyspark from the bin directory of spark and EUREKA, it worked :-) Attached is the screenshot for the same. Now, the problem is with IPython notebook. I cannot get it to work with pySpark. I have a cluster with 4 nodes using CDH5.4 I was able to resolve the problem. Now the next challenge was to configure it with IPython. Followed the steps as documented in the blog. And I get the errors, attached is the screenshot @Julian, I tried your method too. Attached is the screenshot of the error message 7.png Hope you can help me out to fix this problem. Thank you for your time. Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Fri, Jul 10, 2015 at 12:02 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Your 00-pyspark-setup file looks very different from mine (and from the one described in the blog post). Questions: 1) Do you have SPARK_HOME set up in your environment? Because if not, it sets it to None in your code. You should provide the path to your Spark installation. In my case I have spark-1.3.1 installed under $HOME/Software and the code block under # Configure the environment (or yellow highlight in the code below) reflects that. 2) Is there a python2 or python subdirectory under the root of your Spark installation? In my case its python not python2. This contains the Python bindings for spark, so the block under # Add the PySpark/py4j to the Python path (or green highlight in the code below) adds it to the Python sys.path so things like pyspark.SparkContext are accessible in your Python environment. import os import sys # Configure the environment if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1 # Create a variable for our root path SPARK_HOME = os.environ['SPARK_HOME'] # Add the PySpark/py4j to the Python Path sys.path.insert(0, os.path.join(SPARK_HOME, python, build)) sys.path.insert(0, os.path.join(SPARK_HOME, python)) Hope this fixes things for you. -sujit On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Thanks for your response. So i opened a new notebook using the command ipython notebook --profile spark and tried the sequence of commands. i am getting errors. Attached is the screenshot of the same. Also I am attaching the 00-pyspark-setup.py for your reference. Looks like, I have written something wrong here. Cannot seem to figure out, what is it? Thank you for your help Sincerely, Ashish Dutt On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Nice post. Agreed, kudos to the author of the post, Benjamin Benfort of District Labs. Following your post, I get this problem; Again, not my post. I did try setting up IPython with the Spark profile for the edX Intro to Spark course (because I didn't
Re: PySpark without PySpark
Hi Julian, I recently built a Python+Spark application to do search relevance analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on EC2 (so I don't use the PySpark shell, hopefully thats what you are looking for). Can't share the code, but the basic approach is covered in this blog post - scroll down to the section Writing a Spark Application. https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python Hope this helps, -sujit On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com wrote: Hey. Is there a resource that has written up what the necessary steps are for running PySpark without using the PySpark shell? I can reverse engineer (by following the tracebacks and reading the shell source) what the relevant Java imports needed are, but I would assume someone has attempted this before and just published something I can either follow or install? If not, I have something that pretty much works and can publish it, but I'm not a heavy Spark user, so there may be some things I've left out that I haven't hit because of how little of pyspark I'm playing with. Thanks, Julian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark without PySpark
Hi Ashish, Nice post. Agreed, kudos to the author of the post, Benjamin Benfort of District Labs. Following your post, I get this problem; Again, not my post. I did try setting up IPython with the Spark profile for the edX Intro to Spark course (because I didn't want to use the Vagrant container) and it worked flawlessly with the instructions provided (on OSX). I haven't used the IPython/PySpark environment beyond very basic tasks since then though, because my employer has a Databricks license which we were already using for other stuff and we ended up doing the labs on Databricks. Looking at your screenshot though, I don't see why you think its picking up the default profile. One simple way of checking to see if things are working is to open a new notebook and try this sequence of commands: from pyspark import SparkContext sc = SparkContext(local, pyspark) sc You should see something like this after a little while: pyspark.context.SparkContext at 0x1093c9b10 While the context is being instantiated, you should also see lots of log lines scroll by on the terminal where you started the ipython notebook --profile spark command - these log lines are from Spark. Hope this helps, Sujit On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Nice post.. Exactly what I had been looking for. I am relatively a beginner with Spark and real time data processing. We have a server with CDH5.4 with 4 nodes. The spark version in our server is 1.3.0 On my laptop I have spark 1.3.0 too and its using Windows 7 environment. As per point 5 of your post I am able to invoke pyspark locally as in a standalone mode. Following your post, I get this problem; 1. In section Using Ipython notebook with spark I cannot understand why it is picking up the default profile and not the pyspark profile. I am sure it is because of the path variables. Attached is the screenshot. Can you suggest how to solve this. Current the path variables for my laptop are like SPARK_HOME=C:\SPARK-1.3.0\BIN, JAVA_HOME=C:\PROGRAM FILES\JAVA\JDK1.7.0_79, HADOOP_HOME=D:\WINUTILS, M2_HOME=D:\MAVEN\BIN, MAVEN_HOME=D:\MAVEN\BIN, PYTHON_HOME=C:\PYTHON27\, SBT_HOME=C:\SBT\ Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal sujitatgt...@gmail.com wrote: You are welcome Davies. Just to clarify, I didn't write the post (not sure if my earlier post gave that impression, apologize if so), although I agree its great :-). -sujit On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com wrote: Great post, thanks for sharing with us! On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Julian, I recently built a Python+Spark application to do search relevance analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on EC2 (so I don't use the PySpark shell, hopefully thats what you are looking for). Can't share the code, but the basic approach is covered in this blog post - scroll down to the section Writing a Spark Application. https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python Hope this helps, -sujit On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com wrote: Hey. Is there a resource that has written up what the necessary steps are for running PySpark without using the PySpark shell? I can reverse engineer (by following the tracebacks and reading the shell source) what the relevant Java imports needed are, but I would assume someone has attempted this before and just published something I can either follow or install? If not, I have something that pretty much works and can publish it, but I'm not a heavy Spark user, so there may be some things I've left out that I haven't hit because of how little of pyspark I'm playing with. Thanks, Julian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark without PySpark
You are welcome Davies. Just to clarify, I didn't write the post (not sure if my earlier post gave that impression, apologize if so), although I agree its great :-). -sujit On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com wrote: Great post, thanks for sharing with us! On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Julian, I recently built a Python+Spark application to do search relevance analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on EC2 (so I don't use the PySpark shell, hopefully thats what you are looking for). Can't share the code, but the basic approach is covered in this blog post - scroll down to the section Writing a Spark Application. https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python Hope this helps, -sujit On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com wrote: Hey. Is there a resource that has written up what the necessary steps are for running PySpark without using the PySpark shell? I can reverse engineer (by following the tracebacks and reading the shell source) what the relevant Java imports needed are, but I would assume someone has attempted this before and just published something I can either follow or install? If not, I have something that pretty much works and can publish it, but I'm not a heavy Spark user, so there may be some things I've left out that I haven't hit because of how little of pyspark I'm playing with. Thanks, Julian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HOw to concatenate two csv files into one RDD?
Hi Rex, If the CSV files are in the same folder and there are no other files, specifying the directory to sc.textFiles() (or equivalent) will pull in all the files. If there are other files, you can pass in a pattern that would capture the two files you care about (if thats possible). If neither of these work for you, you can create individual RDDs for each file and union them. -sujit On Fri, Jun 26, 2015 at 11:00 AM, Rex X dnsr...@gmail.com wrote: With Python Pandas, it is easy to do concatenation of dataframes by combining pandas.concat http://pandas.pydata.org/pandas-docs/stable/generated/pandas.concat.html and pandas.read_csv pd.concat([pd.read_csv(os.path.join(Path_to_csv_files, f)) for f in csvfiles]) where csvfiles is the list of csv files HOw can we do this in Spark?
Re: What is the right algorithm to do cluster analysis with mixed numeric, categorical, and string value attributes?
Hi Rexx, In general (ie not Spark specific), its best to convert categorical data to 1-hot encoding rather than integers - that way the algorithm doesn't use the ordering implicit in the integer representation. -sujit On Tue, Jun 16, 2015 at 1:17 PM, Rex X dnsr...@gmail.com wrote: Is it necessary to convert categorical data into integers? Any tips would be greatly appreciated! -Rex On Sun, Jun 14, 2015 at 10:05 AM, Rex X dnsr...@gmail.com wrote: For clustering analysis, we need a way to measure distances. When the data contains different levels of measurement - *binary / categorical (nominal), counts (ordinal), and ratio (scale)* To be concrete, for example, working with attributes of *city, zip, satisfaction_level, price* In the meanwhile, the real data usually also contains string attributes, for example, book titles. The distance between two strings can be measured by minimum-edit-distance. In SPSS, it provides Two-Step Cluster, which can handle both ratio scale and ordinal numbers. What is right algorithm to do hierarchical clustering analysis with all these four-kind attributes above with *MLlib*? If we cannot find a right metric to measure the distance, an alternative solution is to do a topological data analysis (e.g. linkage, and etc). Can we do such kind of analysis with *GraphX*? -Rex
Re: Access several s3 buckets, with credentials containing /
Hi Pierre, One way is to recreate your credentials until AWS generates one without a slash character in it. Another way I've been using is to pass these credentials outside the S3 file path by setting the following (where sc is the SparkContext). sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, ACCESS_KEY) sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey, SECRET_KEY) After that you can define the RDDs more simply: val c1 = sc.textFile(s3n://bucket1/file.csv) -sujit On Fri, Jun 5, 2015 at 3:55 AM, Steve Loughran ste...@hortonworks.com wrote: On 5 Jun 2015, at 08:03, Pierre B pierre.borckm...@realimpactanalytics.com wrote: Hi list! My problem is quite simple. I need to access several S3 buckets, using different credentials.: ``` val c1 = sc.textFile(s3n://[ACCESS_KEY_ID1:SECRET_ACCESS_KEY1]@bucket1/file.csv).count val c2 = sc.textFile(s3n://[ACCESS_KEY_ID2:SECRET_ACCESS_KEY2]@bucket2/file.csv).count val c3 = sc.textFile(s3n://[ACCESS_KEY_ID3:SECRET_ACCESS_KEY3]@bucket3/file.csv).count ... ``` One/several of those AWS credentials might contain / in the private access key. This is a known problem and from my research, the only ways to deal with these / are: 1/ use environment variables to set the AWS credentials, then access the s3 buckets without specifying the credentials 2/ set the hadoop configuration to contain the the credentials. However, none of these solutions allow me to access different buckets, with different credentials. Can anyone help me on this? Thanks Pierre long known outstanding bug in Hadoop s3n, nobody has ever sat down to fix. One subtlety is its really hard to test -as you need credentials with a / in. The general best practise is recreate your credentials Now, if you can get the patch to work against hadoop trunk, I promise I will commit it https://issues.apache.org/jira/browse/HADOOP-3733 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Not able to run SparkPi locally
Hello all, This is probably me doing something obviously wrong, would really appreciate some pointers on how to fix this. I installed spark-1.3.1-bin-hadoop2.6.tgz from the Spark download page [ https://spark.apache.org/downloads.html] and just untarred it on a local drive. I am on Mac OSX 10.9.5 and the JDK is 1.8.0_40. I ran the following commands (the first 3 run succesfully, I mention it here to rule out any possibility of it being an obviously bad install). 1) laptop$ bin/spark-shell scala sc.parallelize(1 to 100).count() res0: Long = 100 scala exit 2) laptop$ bin/pyspark sc.parallelize(range(100)).count() 100 quit() 3) laptop$ bin/spark-submit examples/src/main/python/pi.py Pi is roughly 3.142800 4) laptop$ bin/run-example SparkPi This hangs at this line (full stack trace is provided at the end of this mail) 15/05/23 07:52:10 INFO Executor: Fetching http://10.0.0.5:51575/jars/spark-examples-1.3.1-hadoop2.6.0.jar with timestamp 1432392670140 15/05/23 07:52:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.net.SocketTimeoutException: connect timed out ... and finally dies with this message: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketTimeoutException: connect timed out I checked with ifconfig -a on my box, 10.0.0.5 is my IP address on my local network. en0: flags=8863UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST mtu 1500 ether 34:36:3b:d2:b0:f4 inet 10.0.0.5 netmask 0xff00 broadcast 10.0.0.255 media: autoselect status: active I think perhaps there may be some configuration I am missing. Being able to run jobs locally (without HDFS or creating a cluster) is essential for development, and the examples come from the Spark 1.3.1 Quick Start page [ https://spark.apache.org/docs/latest/quick-start.html], so this is probably something to do with my environment. Thanks in advance for any help you can provide. -sujit = Full output of SparkPi run (including stack trace) follows: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/05/23 08:08:55 INFO SparkContext: Running Spark version 1.3.1 15/05/23 08:08:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/23 08:08:57 INFO SecurityManager: Changing view acls to: palsujit 15/05/23 08:08:57 INFO SecurityManager: Changing modify acls to: palsujit 15/05/23 08:08:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(palsujit); users with modify permissions: Set(palsujit) 15/05/23 08:08:57 INFO Slf4jLogger: Slf4jLogger started 15/05/23 08:08:57 INFO Remoting: Starting remoting 15/05/23 08:08:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.5:52008] 15/05/23 08:08:58 INFO Utils: Successfully started service 'sparkDriver' on port 52008. 15/05/23 08:08:58 INFO SparkEnv: Registering MapOutputTracker 15/05/23 08:08:58 INFO SparkEnv: Registering BlockManagerMaster 15/05/23 08:08:58 INFO DiskBlockManager: Created local directory at /var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-d97baddf-1b6f-41db-92bb-f82ab5184cb7/blockmgr-4ef3a194-1929-4dd3-a0e5-215175d8e41a 15/05/23 08:08:58 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/05/23 08:08:58 INFO HttpFileServer: HTTP File server directory is /var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-fdf36480-def0-44b7-9942-098d9ef3e2b4/httpd-e494852a-7d61-4441-8b80-566d9f820afb 15/05/23 08:08:58 INFO HttpServer: Starting HTTP Server 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/23 08:08:58 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52009 15/05/23 08:08:58 INFO Utils: Successfully started service 'HTTP file server' on port 52009. 15/05/23 08:08:58 INFO SparkEnv: Registering OutputCommitCoordinator 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/23 08:08:58 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/05/23 08:08:58 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/05/23 08:08:58 INFO SparkUI: Started SparkUI at http://10.0.0.5:4040 15/05/23 08:08:58 INFO SparkContext: Added JAR file:/Users/palsujit/Software/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar at http://10.0.0.5:52009/jars/spark-examples-1.3.1-hadoop2.6.0.jar with timestamp 1432393738514 15/05/23 08:08:58 INFO Executor: Starting executor ID driver on host localhost 15/05/23 08:08:58 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.0.5:52008/user/HeartbeatReceiver 15/05/23 08:08:58 INFO NettyBlockTransferService: Server created on 52010 15/05/23 08:08:58 INFO BlockManagerMaster: Trying to register BlockManager 15/05/23 08:08:58 INFO
Re: Not able to run SparkPi locally
Replying to my own email in case someone has the same or similar issue. On a hunch I ran this against my Linux (Ubuntu 14.04 with JDK 8) box. Not only did bin/run-example SparkPi run without any problems, it also provided a very helpful message in the output. 15/05/23 08:35:15 WARN Utils: Your hostname, tsunami resolves to a loopback address: 127.0.1.1; using 10.0.0.10 instead (on interface wlan0) 15/05/23 08:35:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address So I went back to my Mac, set SPARK_LOCAL_IP=127.0.0.1 and everything runs fine now. To make this permanent I put this in conf/spark-env.sh. -sujit On Sat, May 23, 2015 at 8:14 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hello all, This is probably me doing something obviously wrong, would really appreciate some pointers on how to fix this. I installed spark-1.3.1-bin-hadoop2.6.tgz from the Spark download page [ https://spark.apache.org/downloads.html] and just untarred it on a local drive. I am on Mac OSX 10.9.5 and the JDK is 1.8.0_40. I ran the following commands (the first 3 run succesfully, I mention it here to rule out any possibility of it being an obviously bad install). 1) laptop$ bin/spark-shell scala sc.parallelize(1 to 100).count() res0: Long = 100 scala exit 2) laptop$ bin/pyspark sc.parallelize(range(100)).count() 100 quit() 3) laptop$ bin/spark-submit examples/src/main/python/pi.py Pi is roughly 3.142800 4) laptop$ bin/run-example SparkPi This hangs at this line (full stack trace is provided at the end of this mail) 15/05/23 07:52:10 INFO Executor: Fetching http://10.0.0.5:51575/jars/spark-examples-1.3.1-hadoop2.6.0.jar with timestamp 1432392670140 15/05/23 07:52:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.net.SocketTimeoutException: connect timed out ... and finally dies with this message: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketTimeoutException: connect timed out I checked with ifconfig -a on my box, 10.0.0.5 is my IP address on my local network. en0: flags=8863UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST mtu 1500 ether 34:36:3b:d2:b0:f4 inet 10.0.0.5 netmask 0xff00 broadcast 10.0.0.255 media: autoselect status: active I think perhaps there may be some configuration I am missing. Being able to run jobs locally (without HDFS or creating a cluster) is essential for development, and the examples come from the Spark 1.3.1 Quick Start page [ https://spark.apache.org/docs/latest/quick-start.html], so this is probably something to do with my environment. Thanks in advance for any help you can provide. -sujit = Full output of SparkPi run (including stack trace) follows: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/05/23 08:08:55 INFO SparkContext: Running Spark version 1.3.1 15/05/23 08:08:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/23 08:08:57 INFO SecurityManager: Changing view acls to: palsujit 15/05/23 08:08:57 INFO SecurityManager: Changing modify acls to: palsujit 15/05/23 08:08:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(palsujit); users with modify permissions: Set(palsujit) 15/05/23 08:08:57 INFO Slf4jLogger: Slf4jLogger started 15/05/23 08:08:57 INFO Remoting: Starting remoting 15/05/23 08:08:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.5:52008] 15/05/23 08:08:58 INFO Utils: Successfully started service 'sparkDriver' on port 52008. 15/05/23 08:08:58 INFO SparkEnv: Registering MapOutputTracker 15/05/23 08:08:58 INFO SparkEnv: Registering BlockManagerMaster 15/05/23 08:08:58 INFO DiskBlockManager: Created local directory at /var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-d97baddf-1b6f-41db-92bb-f82ab5184cb7/blockmgr-4ef3a194-1929-4dd3-a0e5-215175d8e41a 15/05/23 08:08:58 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/05/23 08:08:58 INFO HttpFileServer: HTTP File server directory is /var/folders/z8/s_crq_2j2rqb9mv_4j8djsjnx359l2/T/spark-fdf36480-def0-44b7-9942-098d9ef3e2b4/httpd-e494852a-7d61-4441-8b80-566d9f820afb 15/05/23 08:08:58 INFO HttpServer: Starting HTTP Server 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/23 08:08:58 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52009 15/05/23 08:08:58 INFO Utils: Successfully started service 'HTTP file server' on port 52009. 15/05/23 08:08:58 INFO SparkEnv: Registering OutputCommitCoordinator 15/05/23 08:08:58 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/23 08:08:58 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040