Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?

2014-11-11 Thread Davies Liu
There is a open PR [1] to support broadcast larger than 2G, could you try it? [1] https://github.com/apache/spark/pull/2659 On Tue, Nov 11, 2014 at 6:39 AM, Tom Seddon mr.tom.sed...@gmail.com wrote: Hi, Just wondering if anyone has any advice about this issue, as I am experiencing the same

Re: Pyspark Error when broadcast numpy array

2014-11-11 Thread Davies Liu
This PR fix the problem: https://github.com/apache/spark/pull/2659 cc @josh Davies On Tue, Nov 11, 2014 at 7:47 PM, bliuab bli...@cse.ust.hk wrote: In spark-1.0.2, I have come across an error when I try to broadcast a quite large numpy array(with 35M dimension). The error information except

Re: Pyspark Error when broadcast numpy array

2014-11-11 Thread Davies Liu
. On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User List] [hidden email] wrote: This PR fix the problem: https://github.com/apache/spark/pull/2659 cc @josh Davies On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote: In spark-1.0.2, I have come across an error when I

Re: error when importing HiveContext

2014-11-07 Thread Davies Liu
bin/pyspark will setup the PYTHONPATH of py4j for you, or you need to setup it by yourself. export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip On Fri, Nov 7, 2014 at 8:15 AM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I’m getting this error when importing hive context from

Re: PySpark issue with sortByKey: IndexError: list index out of range

2014-11-07 Thread Davies Liu
Could you tell how large is the data set? It will help us to debug this issue. On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote: I don't have any insight into this bug, but on Spark version 1.0.0 I ran into the same bug running the 'sort.py' example. On a smaller data set, it

Re: PySpark issue with sortByKey: IndexError: list index out of range

2014-11-06 Thread Davies Liu
It should be fixed in 1.1+. Could you have a script to reproduce it? On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote: I don't have any insight into this bug, but on Spark version 1.0.0 I ran into the same bug running the 'sort.py' example. On a smaller data set, it worked

Re: SparkContext._lock Error

2014-11-05 Thread Davies Liu
What's the version of Python? 2.4? Davies On Wed, Nov 5, 2014 at 4:21 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I’m using this system Hadoop 1.0.4 Scala 2.9.3 Hive 0.9.0 With spark 1.1.0. When importing pyspark, I’m getting this error: from pyspark.sql import *

Re: Parquet files are only 6-20MB in size?

2014-11-03 Thread Davies Liu
Befire saveAsParquetFile(), you can call coalesce(N), then you will have N files, it will keep the order as before (repartition() will not). On Mon, Nov 3, 2014 at 1:16 AM, ag007 agre...@mac.com wrote: Thanks Akhil, Am I right in saying that the repartition will spread the data randomly so I

Re: Spark on Yarn probably trying to load all the data to RAM

2014-11-03 Thread Davies Liu
On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions()

Re: pySpark - convert log/txt files into sequenceFile

2014-10-29 Thread Davies Liu
Without the second line, it's will much faster: infile = sc.wholeTextFiles(sys.argv[1]) infile.saveAsSequenceFile(sys.argv[2]) On Wed, Oct 29, 2014 at 3:31 AM, Csaba Ragany rag...@gmail.com wrote: Thank you Holden, it works! infile = sc.wholeTextFiles(sys.argv[1]) rdd =

Re: sampling in spark

2014-10-28 Thread Davies Liu
_cumm = [p[0]] for i in range(1, len(p)): _cumm.append(_cumm[-1] + p[i]) index = set([bisect(_cumm, random.random()) for i in range(k)]) chosed_x = X.zipWithIndex().filter(lambda (v, i): i in index).map(lambda (v, i): v) chosed_y = [v for i, v

Re: Python code crashing on ReduceByKey if I return custom class object

2014-10-27 Thread Davies Liu
This is known issue with PySpark, the class and objects of custom class in current script can not serialized by pickle between driver and worker You can workaround this by put 'testing' in a module, and sending this module to cluster by `sc.addPyFile` Davies On Sun, Oct 26, 2014 at 11:57 PM,

Re: spark is running extremely slow with larger data set, like 2G

2014-10-24 Thread Davies Liu
On Thu, Oct 23, 2014 at 3:14 PM, xuhongnever xuhongne...@gmail.com wrote: my code is here: from pyspark import SparkConf, SparkContext def Undirect(edge): vector = edge.strip().split('\t') if(vector[0].isdigit()): return [(vector[0], vector[1])] return [] conf =

Re: spark is running extremely slow with larger data set, like 2G

2014-10-24 Thread Davies Liu
On Fri, Oct 24, 2014 at 1:37 PM, xuhongnever xuhongne...@gmail.com wrote: Thank you very much. Changing to groupByKey works, it runs much more faster. By the way, could you give me some explanation of the following configurations, after reading the official explanation, i'm still confused,

Re: Python vs Scala performance

2014-10-22 Thread Davies Liu
In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies On Wed, Oct 22, 2014 at 7:51 AM, Marius Soutier mps@gmail.com wrote: It’s an AWS cluster that is rather small at the moment, 4

Re: Python vs Scala performance

2014-10-22 Thread Davies Liu
available? On 22.10.2014, at 19:01, Davies Liu dav...@databricks.com wrote: In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies

Re: How to aggregate data in Apach Spark

2014-10-20 Thread Davies Liu
You also could use Spark SQL: from pyspark.sql import Row, SQLContext row = Row('id', 'C1', 'C2', 'C3') # convert each data = sc.textFile(test.csv).map(lambda line: line.split(',')) sqlContext = SQLContext(sc) rows = data.map(lambda r: row(*r))

Re: How to disable input split

2014-10-18 Thread Davies Liu
You can call coalesce() to merge the small splits into bigger ones. Davies On Fri, Oct 17, 2014 at 5:35 PM, Larry Liu larryli...@gmail.com wrote: Is it possible to disable input split if input is already small? - To

Re: Spark speed performance

2014-10-18 Thread Davies Liu
How many CPUs on the slave? Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed

Re: PySpark joins fail - please help

2014-10-17 Thread Davies Liu
Hey Russell, join() can only work with RDD of pairs (key, value), such as rdd1: (k, v1) rdd2: (k, v2) rdd1.join(rdd2) will be (k1, v1, v2) Spark SQL will be more useful for you, see http://spark.apache.org/docs/1.1.0/sql-programming-guide.html Davies On Fri, Oct 17, 2014 at 5:01 PM,

Re: PySpark Error on Windows with sc.wholeTextFiles

2014-10-16 Thread Davies Liu
It's a bug, could you file a JIRA for this? Thanks! Davies On Thu, Oct 16, 2014 at 8:28 AM, Griffiths, Michael (NYC-RPM) michael.griffi...@reprisemedia.com wrote: Hi, I’m running into an error on Windows (x64, 8.1) running Spark 1.1.0 (pre-builet for Hadoop 2.4:

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
It seems a bug, Could you create a JIRA for it? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
On Thu, Oct 16, 2014 at 9:53 AM, Gen gen.tan...@gmail.com wrote: Hi, I am trying to use ALS.trainImplicit method in the pyspark.mllib.recommendation. However it didn't work. So I tried use the example in the python API documentation such as: /r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1,

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
Could you post the code that have problem with pyspark? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
I can run the following code against Spark 1.1 sc = SparkContext() r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) Davies On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu dav...@databricks.com wrote: Could you post

Re: pyspark - extract 1 field from string

2014-10-14 Thread Davies Liu
rdd.map(lambda line: int(line.split(',')[3])) On Tue, Oct 14, 2014 at 6:58 PM, Chop thomrog...@att.net wrote: I'm stumped with how to take 1 RDD that has lines like: 4,01012009,00:00,1289,4 5,01012009,00:00,1326,4 6,01012009,00:00,1497,7 and produce a new RDD with just the 4th field

Re: where are my python lambda functions run in yarn-client mode?

2014-10-11 Thread Davies Liu
in cases where I was only interested in the first value, my code was breaking horribly on 1.0.2, but working fine on 1.1. My only suggestion would be to backport 'spark.localExecution.enabled' to the 1.0 line. Thanks for all your help! Evan On Fri, Oct 10, 2014 at 10:40 PM, Davies Liu dav

Re: Help with using combineByKey

2014-10-10 Thread Davies Liu
Maybe this version is easier to use: plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) = (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA

Re: java.io.IOException Error in task deserialization

2014-10-10 Thread Davies Liu
HttpBroadcastFactory instead of TorrentBroadcast. So far, with HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you posted. On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote: This exception should be caused by another one, could you paste all of them here

Re: where are my python lambda functions run in yarn-client mode?

2014-10-10 Thread Davies Liu
to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally

Re: GroupBy Key and then sort values with the group

2014-10-09 Thread Davies Liu
There is a new API called repartitionAndSortWithinPartitions() in master, it may help in this case, then you should do the `groupBy()` by yourself. On Wed, Oct 8, 2014 at 4:03 PM, chinchu chinchu@gmail.com wrote: Sean, I am having a similar issue, but I have a lot of data for a group I

Re: where are my python lambda functions run in yarn-client mode?

2014-10-09 Thread Davies Liu
When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in

Re: java.io.IOException Error in task deserialization

2014-10-09 Thread Davies Liu
This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task

Re: java.io.IOException Error in task deserialization

2014-10-09 Thread Davies Liu
Could you provide a script to reproduce this problem? Thanks! On Wed, Oct 8, 2014 at 9:13 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: This is also happening to me on a regular basis, when the job is large with relatively large serialized objects used in each RDD lineage. A bad thing

Re: Parsing one big multiple line .xml loaded in RDD using Python

2014-10-07 Thread Davies Liu
Maybe sc.wholeTextFile() is what you want, you can get the whole text and parse it by yourself. On Tue, Oct 7, 2014 at 1:06 AM, jan.zi...@centrum.cz wrote: Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here:

Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread Davies Liu
sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate

Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread Davies Liu
in cluster mode. In local mode, may be you should use absolute path for the files. Davies __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 06.10.2014 18:09 Předmět: Re: Spark and Python using generator of data

Re: Trouble getting filtering on field correct

2014-10-03 Thread Davies Liu
rdd.filter(lambda line: int(line.split(' ')[8]) = 125) On Fri, Oct 3, 2014 at 8:16 PM, Chop thomrog...@att.net wrote: Given an RDD with multiple lines of the form: u'207.86.121.131 207.86.121.131 2012-11-27 13:02:17 titlestring 622592 27 184464' (fields are separated by a ) What pyspark

Re: IPython Notebook Debug Spam

2014-10-01 Thread Davies Liu
On Tue, Sep 30, 2014 at 10:14 PM, Rick Richardson rick.richard...@gmail.com wrote: I am experiencing significant logging spam when running PySpark in IPython Notebok Exhibit A: http://i.imgur.com/BDP0R2U.png I have taken into consideration advice from:

Re: IPython Notebook Debug Spam

2014-10-01 Thread Davies Liu
it entirely, all with the same results. On Wed, Oct 1, 2014 at 1:49 PM, Davies Liu dav...@databricks.com wrote: On Tue, Sep 30, 2014 at 10:14 PM, Rick Richardson rick.richard...@gmail.com wrote: I am experiencing significant logging spam when running PySpark in IPython Notebok Exhibit

Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Davies Liu
could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes

Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-25 Thread Davies Liu
Maybe you have Python 2.7 on master but Python 2.6 in cluster, you should upgrade python to 2.7 in cluster, or use python 2.6 in master by set PYSPARK_PYTHON=python2.6 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble using iPython

Re: Null values in pyspark Row

2014-09-24 Thread Davies Liu
Could create a JIRA and add test cases for it? Thanks! Davies On Wed, Sep 24, 2014 at 11:56 AM, jamborta jambo...@gmail.com wrote: Hi all, I have just updated to spark 1.1.0. The new row representation of the data in spark SQL is very handy. I have noticed that it does not set None for

Re: access javaobject in rdd map

2014-09-23 Thread Davies Liu
You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python).

Re: access javaobject in rdd map

2014-09-23 Thread Davies Liu
functions in scala, so I prefer not to reimplement the whole thing in python. thanks, On Tue, Sep 23, 2014 at 5:40 PM, Davies Liu dav...@databricks.com wrote: You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23

Re: java.lang.NegativeArraySizeException in pyspark

2014-09-23 Thread Davies Liu
written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly

Re: Time difference between Python and Scala

2014-09-19 Thread Davies Liu
I think it's normal. On Fri, Sep 19, 2014 at 12:07 AM, Luis Guerra luispelay...@gmail.com wrote: Hello everyone, What should be the normal time difference between Scala and Python using Spark? I mean running the same program in the same cluster environment. In my case I am using numpy array

Re: schema for schema

2014-09-18 Thread Davies Liu
Thanks for reporting this, it will be fixed by https://github.com/apache/spark/pull/2448 On Thu, Sep 18, 2014 at 12:32 PM, Michael Armbrust mich...@databricks.com wrote: This looks like a bug, we are investigating. On Thu, Sep 18, 2014 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com

Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD

2014-09-17 Thread Davies Liu
PipelinedRDD is an RDD generated by Python mapper/reducer, such as rdd.map(func) will be PipelinedRDD. PipelinedRDD is an subclass of RDD, so it should have all the APIs which RDD has. sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count() 10 I'm wondering that how can you

Re: pyspark on yarn - lost executor

2014-09-17 Thread Davies Liu
Maybe the Python worker use too much memory during groupByKey(), groupByKey() with larger numPartitions can help. Also, can you upgrade your cluster to 1.1? It can spilling the data into disks if the memory can not hold all the data during groupByKey(). Also, If there is hot key with dozens of

Re: Number of partitions when saving (pyspark)

2014-09-17 Thread Davies Liu
On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra luispelay...@gmail.com wrote: Hi everyone, Is it possible to fix the number of tasks related to a saveAsTextFile in Pyspark? I am loading several files from HDFS, fixing the number of partitions to X (let's say 40 for instance). Then some

Re: Unable to ship external Python libraries in PYSPARK

2014-09-16 Thread Davies Liu
Yes, sc.addFile() is what you want: | addFile(self, path) | Add a file to be downloaded with this Spark job on every node. | The C{path} passed can be either a local file, a file in HDFS | (or other Hadoop-supported filesystems), or an HTTP, HTTPS or | FTP URI. | |

Re: Broadcast error

2014-09-15 Thread Davies Liu
://sparkMaster@hostname:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@ hostname:7077] ?? Any suggestions?? On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu dav...@databricks.com wrote: Hey Chengi, What's the version of Spark you are using

Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I neglected to specify that I'm using pyspark. Doesn't look like these APIs have been

Re: Write 1 RDD to multiple output paths in one go

2014-09-15 Thread Davies Liu
Maybe we should provide an API like saveTextFilesByKey(path), could you create an JIRA for it ? There is one in DPark [1] actually. [1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309 On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any tips

Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
for enumerating paths on HDFS. Have I overlooked one? On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote: In PySpark, I think you could enumerate all the valid files, and create RDD by newAPIHadoopFile(), then union them together. On Mon, Sep 15, 2014 at 5:49 AM, Eric

Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
Or maybe you could give this one a try: https://labs.spotify.com/2013/05/07/snakebite/ On Mon, Sep 15, 2014 at 2:51 PM, Davies Liu dav...@databricks.com wrote: There is one way by do it in bash: hadoop fs -ls , maybe you could end up with a bash scripts to do the things. On Mon, Sep 15

Re: Broadcast error

2014-09-14 Thread Davies Liu
Hey Chengi, What's the version of Spark you are using? It have big improvements about broadcast in 1.1, could you try it? On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu chengi.liu...@gmail.com wrote: Any suggestions.. I am really blocked on this one On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu

Re: coalesce on SchemaRDD in pyspark

2014-09-12 Thread Davies Liu
This is a bug, I had create an issue to track this: https://issues.apache.org/jira/browse/SPARK-3500 Also, there is PR to fix this: https://github.com/apache/spark/pull/2369 Before next bugfix release, you can workaround this by: srdd = sqlCtx.jsonRDD(rdd) srdd2 =

Re: coalesce on SchemaRDD in pyspark

2014-09-12 Thread Davies Liu
in the web-ui, although haven't tested it any beyond that). Thanks! -Brad On Thu, Sep 11, 2014 at 11:30 PM, Davies Liu dav...@databricks.com wrote: This is a bug, I had create an issue to track this: https://issues.apache.org/jira/browse/SPARK-3500 Also, there is PR to fix this: https

Re: Unable to ship external Python libraries in PYSPARK

2014-09-12 Thread Davies Liu
By SparkContext.addPyFile(xx.zip), the xx.zip will be copies to all the workers and stored in temporary directory, the path to xx.zip will be in the sys.path on worker machines, so you can import xx in your jobs, it does not need to be installed on worker machines. PS: the package or module

Re: Setting up jvm in pyspark from shell

2014-09-11 Thread Davies Liu
The heap size of JVM can not been changed dynamically, so you need to config it before running pyspark. If you run it in local mode, you should config spark.driver.memory (in 1.1 or master). Or, you can use --driver-memory 2G (should work in 1.0+) On Wed, Sep 10, 2014 at 10:43 PM, Mohit Singh

Re: groupBy gives non deterministic results

2014-09-10 Thread Davies Liu
. I didn't receive this message from the user list. I am not in databricks, so I can't answer your other questions. Maybe Davies Liu dav...@databricks.com can answer you? -- Ye Xianjin Sent with Sparrow On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote: Hi, Xianjin I checked

Re: RDD memory questions

2014-09-10 Thread Davies Liu
On Wed, Sep 10, 2014 at 1:05 AM, Boxian Dong box...@indoo.rs wrote: Thank you very much for your kindly help. I rise some another questions: - If the RDD is stored in serialized format, is that means that whenever the RDD is processed, it will be unpacked and packed again from and back to

Re: groupBy gives non deterministic results

2014-09-09 Thread Davies Liu
What's the type of the key? If the hash of key is different across slaves, then you could get this confusing results. We had met this similar results in Python, because of hash of None is different across machines. Davies On Mon, Sep 8, 2014 at 8:16 AM, redocpot julien19890...@gmail.com wrote:

Re: groupBy gives non deterministic results

2014-09-09 Thread Davies Liu
Which version of Spark are you using? This bug had been fixed in 0.9.2, 1.0.2 and 1.1, could you upgrade to one of these versions to verify it? Davies On Tue, Sep 9, 2014 at 7:03 AM, redocpot julien19890...@gmail.com wrote: Thank you for your replies. More details here: The prog is

Re: RDD memory questions

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 10:07 AM, Boxian Dong box...@indoo.rs wrote: I currently working on a machine learning project, which require the RDDs' content to be (mostly partially) updated during each iteration. Because the program will be converted directly from traditional python object-oriented

Re: PySpark on Yarn - how group by data properly

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I came from map/reduce background and try to do quite trivial thing: I have a lot of files ( on hdfs ) - format is : 1 , 2 , 3 2 , 3 , 5 1 , 3, 5 2, 3 , 4 2 , 5, 1 I am actually need

Re: Spark SQL check if query is completed (pyspark)

2014-09-06 Thread Davies Liu
The SQLContext.sql() will return an SchemaRDD, you need to call collect() to pull the data in. On Sat, Sep 6, 2014 at 6:02 AM, jamborta jambo...@gmail.com wrote: Hi, I am using Spark SQL to run some administrative queries and joins (e.g. create table, insert overwrite, etc), where the query

Re: Getting the type of an RDD in spark AND pyspark

2014-09-06 Thread Davies Liu
But you can not get what you expected in PySpark, because the RDD in Scala is serialized, so it will always be RDD[Array[Byte]], whatever the type of RDD in Python is. Davies On Sat, Sep 6, 2014 at 4:09 AM, Aaron Davidson ilike...@gmail.com wrote: Pretty easy to do in Scala:

Re: PySpark on Yarn a lot of python scripts project

2014-09-05 Thread Davies Liu
Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as:

Re: PySpark on Yarn a lot of python scripts project

2014-09-05 Thread Davies Liu
is never is 'trivial' over time). =:). Less is more. On 09/05/2014 01:58 PM, Marcelo Vanzin wrote: On Fri, Sep 5, 2014 at 10:50 AM, Davies Liu dav...@databricks.com wrote: In daily development, it's common to modify your projects and re-run the jobs. If using zip or egg to package your code

Re: spark sql results maintain order (in python)

2014-09-04 Thread Davies Liu
On Thu, Sep 4, 2014 at 3:42 AM, jamborta jambo...@gmail.com wrote: hi, I ran into a problem with spark sql, when run a query like this select count(*), city, industry from table group by hour and I would like to take the results from the shemaRDD 1, I have to parse each line to get the

Re: 2 python installations cause PySpark on Yarn problem

2014-09-04 Thread Davies Liu
Hey Oleg, In pyspark, you MUST have the same version of Python in all the machines of the cluster, which means when you run `python` on these machines, all of them should be the same version ( 2.6 or 2.7). With PYSPARK_PYTHON, you can run pyspark with a specified version of Python. Also, you

Re: Spark on Mesos: Pyspark python libraries

2014-09-02 Thread Davies Liu
PYSPARK_PYTHON may work for you, it's used to specify which Python interpreter should be used in both driver and worker. For example, if anaconda was installed as /anaconda on all the machines, then you can specify PYSPARK_PYTHON=/anaconda/bin/python to use anaconda virtual environment in

Re: u'' notation with pyspark output data

2014-08-29 Thread Davies Liu
u'14.0' means a unicode string, you can convert into str by u'14.0'.encode('utf8'), or you can convert it into float by float(u'14.0') Davies On Thu, Aug 28, 2014 at 11:22 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am working with pyspark and doing simple aggregation def

Re: repartitioning an RDD yielding imbalance

2014-08-28 Thread Davies Liu
On Thu, Aug 28, 2014 at 7:00 AM, Rok Roskar rokros...@gmail.com wrote: I've got an RDD where each element is a long string (a whole document). I'm using pyspark so some of the handy partition-handling functions aren't available, and I count the number of elements in each partition with: def

Re: Python script runs fine in local mode, errors in other modes

2014-08-19 Thread Davies Liu
Could you post the completed stacktrace? On Tue, Aug 19, 2014 at 10:47 AM, Aaron aaron.doss...@target.com wrote: Hello, I have a relatively simple python program that works just find in local most (--master local) but produces a strange error when I try to run it via Yarn ( --deploy-mode

Re: Python script runs fine in local mode, errors in other modes

2014-08-19 Thread Davies Liu
This script run very well without your CSV file. Could download you CSV file into local disks, and narrow down to the lines which triggle this issue? On Tue, Aug 19, 2014 at 12:02 PM, Aaron aaron.doss...@target.com wrote: These three lines of python code cause the error for me: sc =

Re: Segmented fold count

2014-08-18 Thread Davies Liu
import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an

Re: Segmented fold count

2014-08-18 Thread Davies Liu
with same value. On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote: import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs

Re: application as a service

2014-08-18 Thread Davies Liu
Another option is using Tachyon to cache the RDD, then the cache can be shared by different applications. See how to use Spark with Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html Davies On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote: You can also look

Re: Merging complicated small matrices to one big matrix

2014-08-18 Thread Davies Liu
rdd.flatMap(lambda x:x) maybe could solve your problem, it will convert an RDD from [[[1,2,3],[4,5,6]],[[7,8,9,],[10,11,12]]] into: [[1,2,3], [4,5,6], [7,8,9,], [10,11,12]] On Mon, Aug 18, 2014 at 2:42 AM, Chengi Liu chengi.liu...@gmail.com wrote: I have an rdd in pyspark which looks like

Re: Segmented fold count

2014-08-18 Thread Davies Liu
On Mon, Aug 18, 2014 at 7:41 PM, fil f...@pobox.com wrote: fil wrote - Python functions like groupCount; these get reflected from their Python AST and converted into a Spark DAG? Presumably if I try and do something non-convertible this transformation process will throw an error? In other

Re: Question on mappartitionwithsplit

2014-08-17 Thread Davies Liu
offset_lists offset_lists = offset rdd. mapPartitionsWithSplit(indexing) Or: def another_func(offset_lists): def indexing(index, iterator): # access offset_lists pass rdd.mapPartitionsWithIndex(indexing) On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu dav

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey(). PS: it's better to use reduceByKey() or combineByKey() to reduce data size during shuffle. Maybe there is a huge key in the data sets, you can find it in

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
of the flatMap() tasks complete and I start seeing the same Connection Reset errors. On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote: Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
a groupByKey()? On Wed, Aug 13, 2014 at 2:05 PM, Davies Liu dav...@databricks.com wrote: The 1.1 release will come out this or next month, we will really appreciate that if you could test it with you real case. Davies On Wed, Aug 13, 2014 at 1:57 PM, Arpan Ghosh ar...@automatic.com wrote

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
spark.storage.memoryFraction ? On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote: Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey(). PS: it's better to use reduceByKey

Re: error with pyspark

2014-08-10 Thread Davies Liu
On Fri, Aug 8, 2014 at 9:12 AM, Baoqiang Cao bqcaom...@gmail.com wrote: Hi There I ran into a problem and can’t find a solution. I was running bin/pyspark ../python/wordcount.py you could use bin/spark-submit ../python/wordcount.py The wordcount.py is here:

Re: PySpark + executor lost

2014-08-07 Thread Davies Liu
What is the environment ? YARN or Mesos or Standalone? It will be more helpful if you could show more loggings. On Wed, Aug 6, 2014 at 7:25 PM, Avishek Saha avishek.s...@gmail.com wrote: Hi, I get a lot of executor lost error for saveAsTextFile with PySpark and Hadoop 2.4. For small

Re: PySpark, numpy arrays and binary data

2014-08-07 Thread Davies Liu
On Thu, Aug 7, 2014 at 12:06 AM, Rok Roskar rokros...@gmail.com wrote: sure, but if you knew that a numpy array went in on one end, you could safely use it on the other end, no? Perhaps it would require an extension of the RDD class and overriding the colect() method. Could you give a short

Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-06 Thread Davies Liu
There is a PR to fix this: https://github.com/apache/spark/pull/1802 On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I concur that printSchema works; it just seems to be operations that use the data where trouble happens. Thanks for posting the bug. -Brad

Re: PySpark, numpy arrays and binary data

2014-08-06 Thread Davies Liu
numpy array only can support basic types, so we can not use it during collect() by default. Could you give a short example about how numpy array is used in your project? On Wed, Aug 6, 2014 at 8:41 AM, Rok Roskar rokros...@gmail.com wrote: Hello, I'm interested in getting started with Spark

Re: pyspark inferSchema

2014-08-05 Thread Davies Liu
On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile() and sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers the whole data set. For example:

Re: java.lang.StackOverflowError

2014-08-05 Thread Davies Liu
Could you create an re-producable script (and data) to allow us to investigate this? Davies On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I am doing some basic preprocessing in pyspark (local mode as follows): files = [ input files] def read(filename,sc):

Re: pyspark inferSchema

2014-08-05 Thread Davies Liu
going to go ahead and do that. best, -Brad On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote: On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I was just about to ask about this. Currently, there are two methods, sqlContext.jsonFile

Re: Last step of processing is using too much memory.

2014-07-30 Thread Davies Liu
When you do groupBy(), it wish to load all the data into memory for best performance, then you should specify the number of partitions carefully. In Spark master or upcoming 1.1 release, PySpark can do external groupBy(), it means that it will dumps the data into disks if there is not enough

Re: How do you debug a PythonException?

2014-07-30 Thread Davies Liu
The exception in Python means that the worker try to read command from JVM, but it reach the end of socket (socket had been closed). So it's possible that there another exception happened in JVM. Could you change the log level of log4j, then check is there any problem inside JVM? Davies On Wed,

Re: zip two RDD in pyspark

2014-07-29 Thread Davies Liu
On Mon, Jul 28, 2014 at 12:58 PM, l lishu...@gmail.com wrote: I have a file in s3 that I want to map each line with an index. Here is my code: input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() N input_data.count() index = sc.parallelize(range(N), 6)

<    1   2   3   4   >