driver memory management

2014-09-28 Thread Brad Miller
Hi All,

I am interested to collect() a large RDD so that I can run a learning
algorithm on it.  I've noticed that when I don't increase
SPARK_DRIVER_MEMORY I can run out of memory. I've also noticed that it
looks like the same fraction of memory is reserved for storage on the
driver as on the worker nodes, and that the web UI doesn't show any storage
usage on the driver.  Since that memory is reserved for storage, it seems
possible that it is not being used towards the collection of my RDD.

Is there a way to configure the memory management (
spark.storage.memoryFraction, spark.shuffle.memoryFraction) for the driver
separately from the workers?

Is there any reason to leave space for shuffle or storage on the driver?
It seems like I never see either of these used on the web UI, although I
may not be interpreting the UI correctly or my jobs may not trigger the use
case.

For context, I am using PySpark (so much of my processing happens outside
of the allocated memory in Java) and running the Spark 1.1.0 release
binaries.

best,
-Brad


Re: java.io.IOException Error in task deserialization

2014-09-26 Thread Brad Miller
I've had multiple jobs crash due to java.io.IOException: unexpected
exception type; I've been running the 1.1 branch for some time and am now
running the 1.1 release binaries. Note that I only use PySpark. I haven't
kept detailed notes or the tracebacks around since there are other problems
that have caused my greater grief (namely key not found errors).

For me the exception seems to occur non-deterministically, which is a bit
interesting since the error message shows that the same stage has failed
multiple times.  Are you able to consistently re-produce the bug across
multiple invocations at the same place?

On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Has anyone else seen this erorr in task deserialization?  The task is
 processing a small amount of data and doesn't seem to have much data
 hanging to the closure?  I've only seen this with Spark 1.1

 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most 
 recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): 
 java.io.IOException: unexpected exception type
 
 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744)




Re: java.io.IOException Error in task deserialization

2014-09-26 Thread Brad Miller
FWIW I suspect that each count operation is an opportunity for you to
trigger the bug, and each filter operation increases the likelihood of
setting up the bug.  I normally don't come across this error until my job
has been running for an hour or two and had a chance to build up longer
lineages for some RDDs.  It sounds like your data is a bit smaller and it's
more feasible for you to build up longer lineages more quickly.

If you can reduce your number of filter operations (for example by
combining some into a single function) that may help.  It may also help to
introduce persistence or checkpointing at intermediate stages so that the
length of the lineages that have to get replayed isn't as long.

On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote:

 No for me as well it is non-deterministic.  It happens in a piece of code
 that does many filter and counts on a small set of records (~1k-10k).  The
 originally set is persisted in memory and we have a Kryo serializer set for
 it.  The task itself takes in just a few filtering parameters.  This with
 the same setting has sometimes completed to sucess and sometimes failed
 during this step.

 Arun

 On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 I've had multiple jobs crash due to java.io.IOException: unexpected
 exception type; I've been running the 1.1 branch for some time and am now
 running the 1.1 release binaries. Note that I only use PySpark. I haven't
 kept detailed notes or the tracebacks around since there are other problems
 that have caused my greater grief (namely key not found errors).

 For me the exception seems to occur non-deterministically, which is a bit
 interesting since the error message shows that the same stage has failed
 multiple times.  Are you able to consistently re-produce the bug across
 multiple invocations at the same place?

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Has anyone else seen this erorr in task deserialization?  The task is
 processing a small amount of data and doesn't seem to have much data
 hanging to the closure?  I've only seen this with Spark 1.1

 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, 
 most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): 
 java.io.IOException: unexpected exception type
 
 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744)






Re: java.lang.NegativeArraySizeException in pyspark

2014-09-26 Thread Brad Miller
 What is the error? Could you file a JIRA for it?

Turns out there's actually 3 separate errors (indicated below), one of
which **silently returns the wrong value to the user*.*  Should I file a
separate JIRA for each one?  What level should I mark these as (critical,
major, etc.)?

I'm not sure that all of these are bugs as much as feature requests since
it looks like the design of FramedSerializer includes some size constraints
(https://github.com/apache/spark/blob/master/python/pyspark/serializers.py
Serializer that writes objects as a stream of (length, data) pairs, where
C{length} is a 32-bit integer and data is C{length} bytes.).

Attempting to reproduce the bug in isolation in iPython notebook I've
observed the following. Note that I'm running python 2.7.3 on all machines
and using the Spark 1.1.0 binaries.

**BLOCK 1**  [no problem]
import cPickle
from pyspark import SparkContext

def check_pre_serialized(size):
msg = cPickle.dumps(range(2 ** size))
print 'serialized length:', len(msg)
bvar = sc.broadcast(msg)
print 'length recovered from broadcast variable:', len(bvar.value)
print 'correct value recovered:', msg == bvar.value
bvar.unpersist()

def check_unserialized(size):
msg = range(2 ** size)
bvar = sc.broadcast(msg)
print 'correct value recovered:', msg == bvar.value
bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'broadcast_bug')

**BLOCK 2**  [no problem]
check_pre_serialized(20)
 serialized length: 9374656
 length recovered from broadcast variable: 9374656
 correct value recovered: True

**BLOCK 3**  [no problem]
check_unserialized(20)
 correct value recovered: True

**BLOCK 4**  [no problem]
check_pre_serialized(27)
 serialized length: 1499501632
 length recovered from broadcast variable: 1499501632
 correct value recovered: True

**BLOCK 5**  [no problem]
check_unserialized(27)
 correct value recovered: True

***BLOCK 6**  [ERROR 1: unhandled error from cPickle.dumps inside
sc.broadcast]*
check_pre_serialized(28)
.
 /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
 354
 355 def dumps(self, obj):
 -- 356 return cPickle.dumps(obj, 2)
 357
 358 loads = cPickle.loads

 SystemError: error return without exception set

**BLOCK 7**  [no problem]
check_unserialized(28)
 correct value recovered: True

***BLOCK 8**  [ERROR 2: no error occurs and *incorrect result* is returned]*
check_pre_serialized(29)
 serialized length: 6331339840
 length recovered from broadcast variable: 2036372544
 correct value recovered: False

***BLOCK 9**  [ERROR 3: unhandled error from zlib.compress inside
sc.broadcast]*
check_unserialized(29)
..
 /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
 418
 419 def dumps(self, obj):
 -- 420 return zlib.compress(self.serializer.dumps(obj), 1)
 421
 422 def loads(self, obj):

 OverflowError: size does not fit in an int

***BLOCK 10**  [ERROR 1]*
check_pre_serialized(30)
...same as above...

***BLOCK 11**  [ERROR 3]*
check_unserialized(30)
...same as above...

On Thu, Sep 25, 2014 at 2:55 PM, Davies Liu dav...@databricks.com wrote:

 On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  Thanks for your help.
 
  I ultimately re-wrote the code to use broadcast variables, and then
received
  an error when trying to broadcast self.all_models that the size did not
fit
  in an int (recall that broadcasts use 32 bit ints to store size),

 What is the error? Could you file a JIRA for it?

  that it was in fact over 2G.  I don't know why the previous tests
(described
  above) where duplicated portions of self.all_models worked (it could
have
  been an error in either my debugging or notes), but splitting the
  self.all_models into a separate broadcast variable for each element
worked.
  I avoided broadcast variables for a while since there was no way to
  unpersist them in pyspark, but now that there is you're completely right
  that using broadcast is the correct way to code this.

 In 1.1, you could use broadcast.unpersist() to release it, also the
performance
 of Python Broadcast was much improved in 1.1.


  best,
  -Brad
 
  On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com
wrote:
 
  Or maybe there is a bug related to the base64 in py4j, could you
  dumps the serialized bytes of closure to verify this?
 
  You could add a line in spark/python/pyspark/rdd.py:
 
  ser = CloudPickleSerializer()
  pickled_command = ser.dumps(command)
  +  print len(pickled_command), repr(pickled_command)
 
 
  On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi Davies,
  
   That's interesting to know.  Here's more details about my code.  The
   object
   (self) contains pointers

Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Brad Miller
Hi Davies,

Thanks for your help.

I ultimately re-wrote the code to use broadcast variables, and then
received an error when trying to broadcast self.all_models that the size
did not fit in an int (recall that broadcasts use 32 bit ints to store
size), suggesting that it was in fact over 2G.  I don't know why the
previous tests (described above) where duplicated portions of
self.all_models worked (it could have been an error in either my debugging
or notes), but splitting the self.all_models into a separate broadcast
variable for each element worked.  I avoided broadcast variables for a
while since there was no way to unpersist them in pyspark, but now that
there is you're completely right that using broadcast is the correct way to
code this.

best,
-Brad

On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote:

 Or maybe there is a bug related to the base64 in py4j, could you
 dumps the serialized bytes of closure to verify this?

 You could add a line in spark/python/pyspark/rdd.py:

 ser = CloudPickleSerializer()
 pickled_command = ser.dumps(command)
 +  print len(pickled_command), repr(pickled_command)


 On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  That's interesting to know.  Here's more details about my code.  The
 object
  (self) contains pointers to the spark_context (which seems to generate
  errors during serialization) so I strip off the extra state using the
 outer
  lambda function and just pass the value self.all_models into the map.
  all_models is a list of length 9 where each element contains 3 numbers
 (ints
  or floats, can't remember) and then one LinearSVC object.  The classifier
  was trained over ~2.5M features, so the object isn't small, but probably
  shouldn't be 150M either.  Additionally, the call ran OK when I use
 either
  2x the first 5 objects or 2x the last 5 objects (another reason why it
 seems
  unlikely the bug was size related).
 
  def _predict_all_models(all_models, sample):
  scores = []
  for _, (_, _, classifier) in all_models:
  score = classifier.decision_function(sample[VALUE][RECORD])
  scores.append(float(score))
  return (sample[VALUE][LABEL], scores)
 
  # fails
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models)
  # works
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
 
  I've since written a work-around into my code, but if I get a chance I'll
  switch to broadcast variables and see whether that works.
 
  later,
  -brad
 
  On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
 wrote:
 
  The traceback said that the serialized closure cannot be parsed (base64)
  correctly by py4j.
 
  The string in Java cannot be longer than 2G, so the serialized closure
  cannot longer than 1.5G (there are overhead in base64), is it possible
  that your data used in the map function is so big? If it's, you should
  use broadcast for it.
 
  In master of Spark, we will use broadcast automatically if the closure
  is too big. (but use broadcast explicitly is always better).
 
  On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi All,
  
   I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
   script
   I have.  I've pasted the full traceback at the end of this email.
  
   I have isolated the line of code in my script which causes the
   exception
   to occur. Although the exception seems to occur deterministically, it
 is
   very unclear why the different variants of the line would cause the
   exception to occur. Unfortunately, I am only able to reproduce the bug
   in
   the context of a large data processing job, and the line of code which
   must
   change to reproduce the bug has little meaning out of context.  The
 bug
   occurs when I call map on an RDD with a function that references
 some
   state outside of the RDD (which is presumably bundled up and
 distributed
   with the function).  The output of the function is a tuple where the
   first
   element is an int and the second element is a list of floats (same
   positive
   length every time, as verified by an 'assert' statement).
  
   Given that:
   -It's unclear why changes in the line would cause an exception
   -The exception comes from within pyspark code
   -The exception has to do with negative array sizes (and I couldn't
 have
   created a negative sized array anywhere in my python code)
   I suspect this is a bug in pyspark.
  
   Has anybody else observed or reported this bug?
  
   best,
   -Brad
  
   Traceback (most recent call last):
 File /home/bmiller1/pipeline/driver.py, line 214, in module
   main()
 File /home

java.util.NoSuchElementException: key not found

2014-09-16 Thread Brad Miller
Hi All,

I suspect I am experiencing a bug. I've noticed that while running
larger jobs, they occasionally die with the exception
java.util.NoSuchElementException: key not found xyz, where xyz
denotes the ID of some particular task.  I've excerpted the log from
one job that died in this way below and attached the full log for
reference.

I suspect that my bug is the same as SPARK-2002 (linked below).  Is
there any reason to suspect otherwise?  Is there any known workaround
other than not coalescing?
https://issues.apache.org/jira/browse/SPARK-2002
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3CCAMwrk0=d1dww5fdbtpkefwokyozltosbbjqamsqqjowlzng...@mail.gmail.com%3E

Note that I have been coalescing SchemaRDDs using srdd =
SchemaRDD(srdd._jschema_rdd.coalesce(partitions, False, None),
sqlCtx), the workaround described in this thread.
http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/%3ccanr-kkciei17m43-yz5z-pj00zwpw3ka_u7zhve2y7ejw1v...@mail.gmail.com%3E

...
14/09/15 21:43:14 INFO scheduler.TaskSetManager: Starting task 78.0 in
stage 551.0 (TID 78738, bennett.research.intel-research.net,
PROCESS_LOCAL, 1056 bytes)
...
14/09/15 21:43:15 INFO storage.BlockManagerInfo: Added
taskresult_78738 in memory on
bennett.research.intel-research.net:38074 (size: 13.0 MB, free: 1560.8
MB)
...
14/09/15 21:43:15 ERROR scheduler.TaskResultGetter: Exception while
getting task result
java.util.NoSuchElementException: key not found: 78738
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at 
org.apache.spark.scheduler.TaskSetManager.handleTaskGettingResult(TaskSetManager.scala:500)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.handleTaskGettingResult(TaskSchedulerImpl.scala:348)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:52)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)


I am running the pre-compiled 1.1.0 binaries.

best,
-Brad

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-09-14 Thread Brad Miller
Hi Andrew,

I agree with Nicholas.  That was a nice, concise summary of the
meaning of the locality customization options, indicators and default
Spark behaviors.  I haven't combed through the documentation
end-to-end in a while, but I'm also not sure that information is
presently represented somewhere and it would be great to persist it
somewhere besides the mailing list.

best,
-Brad

On Fri, Sep 12, 2014 at 12:12 PM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Andrew,

 This email was pretty helpful. I feel like this stuff should be summarized
 in the docs somewhere, or perhaps in a blog post.

 Do you know if it is?

 Nick


 On Thu, Jun 5, 2014 at 6:36 PM, Andrew Ash and...@andrewash.com wrote:

 The locality is how close the data is to the code that's processing it.
 PROCESS_LOCAL means data is in the same JVM as the code that's running, so
 it's really fast.  NODE_LOCAL might mean that the data is in HDFS on the
 same node, or in another executor on the same node, so is a little slower
 because the data has to travel across an IPC connection.  RACK_LOCAL is even
 slower -- data is on a different server so needs to be sent over the
 network.

 Spark switches to lower locality levels when there's no unprocessed data
 on a node that has idle CPUs.  In that situation you have two options: wait
 until the busy CPUs free up so you can start another task that uses data on
 that server, or start a new task on a farther away server that needs to
 bring data from that remote place.  What Spark typically does is wait a bit
 in the hopes that a busy CPU frees up.  Once that timeout expires, it starts
 moving the data from far away to the free CPU.

 The main tunable option is how far long the scheduler waits before
 starting to move data rather than code.  Those are the spark.locality.*
 settings here: http://spark.apache.org/docs/latest/configuration.html

 If you want to prevent this from happening entirely, you can set the
 values to ridiculously high numbers.  The documentation also mentions that
 0 has special meaning, so you can try that as well.

 Good luck!
 Andrew


 On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:

 I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd
 assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

 When these happen things get extremely slow.

 Does this mean that the executor got terminated and restarted?

 Is there a way to prevent this from happening (barring the machine
 actually going down, I'd rather stick with the same process)?




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: coalesce on SchemaRDD in pyspark

2014-09-12 Thread Brad Miller
Hi Davies,

Thanks for the quick fix. I'm sorry to send out a bug report on release day
- 1.1.0 really is a great release.  I've been running the 1.1 branch for a
while and there's definitely lots of good stuff.

For the workaround, I think you may have meant:

srdd2 = SchemaRDD(srdd._jschema_rdd.coalesce(N, False, None), sqlCtx)

Note:
_schema_rdd - _jschema_rdd
false - False

That workaround seems to work fine (in that I've observed the correct
number of partitions in the web-ui, although haven't tested it any beyond
that).

Thanks!
-Brad

On Thu, Sep 11, 2014 at 11:30 PM, Davies Liu dav...@databricks.com wrote:

 This is a bug, I had create an issue to track this:
 https://issues.apache.org/jira/browse/SPARK-3500

 Also, there is PR to fix this: https://github.com/apache/spark/pull/2369

 Before next bugfix release, you can workaround this by:

 srdd = sqlCtx.jsonRDD(rdd)
 srdd2 = SchemaRDD(srdd._schema_rdd.coalesce(N, false, None), sqlCtx)


 On Thu, Sep 11, 2014 at 6:12 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi All,
 
  I'm having some trouble with the coalesce and repartition functions for
  SchemaRDD objects in pyspark.  When I run:
 
  sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}',
  '{foo:baz}'])).coalesce(1)
 
  I get this error:
 
  Py4JError: An error occurred while calling o94.coalesce. Trace:
  py4j.Py4JException: Method coalesce([class java.lang.Integer, class
  java.lang.Boolean]) does not exist
 
  For context, I have a dataset stored in a parquet file, and I'm using
  SQLContext to make several queries against the data.  I then register the
  results of these as queries new tables in the SQLContext.  Unfortunately
  each new table has the same number of partitions as the original (despite
  being much smaller).  Hence my interest in coalesce and repartition.
 
  Has anybody else encountered this bug?  Is there an alternate workflow I
  should consider?
 
  I am running the 1.1.0 binaries released today.
 
  best,
  -Brad



coalesce on SchemaRDD in pyspark

2014-09-11 Thread Brad Miller
Hi All,

I'm having some trouble with the coalesce and repartition functions for
SchemaRDD objects in pyspark.  When I run:

sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}',
'{foo:baz}'])).coalesce(1)

I get this error:

Py4JError: An error occurred while calling o94.coalesce. Trace:
py4j.Py4JException: Method coalesce([class java.lang.Integer, class
java.lang.Boolean]) does not exist

For context, I have a dataset stored in a parquet file, and I'm using
SQLContext to make several queries against the data.  I then register the
results of these as queries new tables in the SQLContext.  Unfortunately
each new table has the same number of partitions as the original (despite
being much smaller).  Hence my interest in coalesce and repartition.

Has anybody else encountered this bug?  Is there an alternate workflow I
should consider?

I am running the 1.1.0 binaries released today.

best,
-Brad


Re: TimeStamp selection with SparkSQL

2014-09-05 Thread Brad Miller
My approach may be partly influenced by my limited experience with SQL and
Hive, but I just converted all my dates to seconds-since-epoch and then
selected samples from specific time ranges using integer comparisons.


On Thu, Sep 4, 2014 at 6:38 PM, Cheng, Hao hao.ch...@intel.com wrote:

  There are 2 SQL dialects, one is a very basic SQL support and another is
 Hive QL. In most of cases I think people prefer using the HQL, which also
 means you have to use HiveContext instead of the SQLContext.



 In this particular query you showed, seems datatime is the type Date,
 unfortunately, neither of those SQL dialect supports Date, but Timestamp.



 Cheng Hao



 *From:* Benjamin Zaitlen [mailto:quasi...@gmail.com]
 *Sent:* Friday, September 05, 2014 5:37 AM
 *To:* user@spark.apache.org
 *Subject:* TimeStamp selection with SparkSQL



 I may have missed this but is it possible to select on datetime in a
 SparkSQL query



 jan1 = sqlContext.sql(SELECT * FROM Stocks WHERE datetime = '2014-01-01')



 Additionally, is there a guide as to what SQL is valid? The guide says,
 Note that Spark SQL currently uses a very basic SQL parser  It would be
 great to post what is currently supported.



 --Ben







Re: TimeStamp selection with SparkSQL

2014-09-05 Thread Brad Miller
Preprocessing (after loading the data into HDFS).

I started with data in JSON format in text files (stored in HDFS), and then
loaded the data into parquet files with a bit of preprocessing and now I
always retrieve the data by creating a SchemaRDD from the parquet file and
using the SchemaRDD to back a table in a SQLContext.


On Fri, Sep 5, 2014 at 9:53 AM, Benjamin Zaitlen quasi...@gmail.com wrote:

 Hi Brad,

 When you do the conversion is this a Hive/Spark job or is it a
 pre-processing step before loading into HDFS?

 ---Ben


 On Fri, Sep 5, 2014 at 10:29 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 My approach may be partly influenced by my limited experience with SQL
 and Hive, but I just converted all my dates to seconds-since-epoch and then
 selected samples from specific time ranges using integer comparisons.


 On Thu, Sep 4, 2014 at 6:38 PM, Cheng, Hao hao.ch...@intel.com wrote:

  There are 2 SQL dialects, one is a very basic SQL support and another
 is Hive QL. In most of cases I think people prefer using the HQL, which
 also means you have to use HiveContext instead of the SQLContext.



 In this particular query you showed, seems datatime is the type Date,
 unfortunately, neither of those SQL dialect supports Date, but Timestamp.



 Cheng Hao



 *From:* Benjamin Zaitlen [mailto:quasi...@gmail.com]
 *Sent:* Friday, September 05, 2014 5:37 AM
 *To:* user@spark.apache.org
 *Subject:* TimeStamp selection with SparkSQL



 I may have missed this but is it possible to select on datetime in a
 SparkSQL query



 jan1 = sqlContext.sql(SELECT * FROM Stocks WHERE datetime =
 '2014-01-01')



 Additionally, is there a guide as to what SQL is valid? The guide says,
 Note that Spark SQL currently uses a very basic SQL parser  It would be
 great to post what is currently supported.



 --Ben










Re: Spark webUI - application details page

2014-08-29 Thread Brad Miller
How did you specify the HDFS path?  When i put

spark.eventLog.dir   hdfs://
crosby.research.intel-research.net:54310/tmp/spark-events

in my spark-defaults.conf file, I receive the following error:

An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.io.IOException: Call to
crosby.research.intel-research.net/10.212.84.53:54310 failed on local
exception: java.io.EOFException

-Brad


On Thu, Aug 28, 2014 at 12:26 PM, SK skrishna...@gmail.com wrote:

 I was able to recently solve this problem for standalone mode. For this
 mode,
 I did not use a history server. Instead, I set spark.eventLog.dir (in
 conf/spark-defaults.conf) to a directory in hdfs (basically this directory
 should be in a place that is writable by the master and accessible globally
 to all the nodes).



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




/tmp/spark-events permissions problem

2014-08-29 Thread Brad Miller
Hi All,

Yesterday I restarted my cluster, which had the effect of clearing /tmp.
 When I brought Spark back up and ran my first job, /tmp/spark-events was
re-created and the job ran fine.  I later learned that other users were
receiving errors when trying to create a spark context.  It turned out the
reason was that only my user was able to create subdirectories within
/tmp/spark-events.

I believe /tmp/spark-events originally had ownership bmiller1:bmiller1
(where bmiller1 is my username) with permissions 770.  Once I modified
the permission to allow other users to create subdirectories other users
were again able to launch jobs.

Note that I think this may be related to some problems I am having viewing
application history (see link).
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-td3490.html#a13130

Has anybody else experienced a problem with permissions on the
spark.eventLog.dir directory?

best,
-Brad


Re: Spark webUI - application details page

2014-08-28 Thread Brad Miller
Hi All,

@Andrew
Thanks for the tips.  I just built the master branch of Spark last
night, but am still having problems viewing history through the
standalone UI.  I dug into the Spark job events directories as you
suggested, and I see at a minimum 'SPARK_VERSION_1.0.0' and
'EVENT_LOG_1'; for applications that call 'sc.stop()' I also see
'APPLICATION_COMPLETE'.  The version and application complete files
are empty; the event log file contains the information one would need
to repopulate the web UI.

The follow may be helpful in debugging this:
-Each job directory (e.g.
'/tmp/spark-events/testhistoryjob-1409246088110') and the files within
are owned by the user who ran the job with permissions 770.  This
prevents the 'spark' user from accessing the contents.

-When I make a directory and contents accessible to the spark user,
the history server (invoked as 'sbin/start-history-server.sh
/tmp/spark-events') is able to display the history, but the standalone
web UI still produces the following error: 'No event logs found for
application HappyFunTimes in
file:///tmp/spark-events/testhistoryjob-1409246088110. Did you specify
the correct logging directory?'

-Incase it matters, I'm running pyspark.

Do you know what may be causing this?  When you attempt to reproduce
locally, who do you observe owns the files in /tmp/spark-events?

best,
-Brad

On Tue, Aug 26, 2014 at 8:51 AM, SK skrishna...@gmail.com wrote:
 I have already tried setting the history server and accessing it on
 master-url:18080 as per the link. But the page does not list any completed
 applications. As I mentioned in my previous mail, I am running Spark in
 standalone mode on the cluster  (as well as on my local machine). According
 to the link, it appears that the history server is required only in mesos or
 yarn mode, not in standalone mode.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12834.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-15 Thread Brad Miller
Hi Andrew,

I'm running something close to the present master (I compiled several days
ago) but am having some trouble viewing history.

I set spark.eventLog.dir to true, but continually receive the error
message (via the web UI) Application history not found...No event logs
found for application ml-pipeline in
file:/tmp/spark-events/ml-pipeline-1408117588599.  I tried 2 fixes:

-I manually set spark.eventLog.dir to a path beginning with file:///,
believe that perhaps the problem was an invalid protocol specification.

-I inspected /tmp/spark-events manually and noticed that each job directory
(and the files there-in) were owned by the user who launched the job and
were not world readable.  Since I run Spark from a dedicated Spark user, I
set the files world readable but I still receive the same Application
history not found error.

Is there a configuration step I may be missing?

-Brad


On Thu, Aug 14, 2014 at 7:33 PM, Andrew Or and...@databricks.com wrote:

 Hi SK,

 Not sure if I understand you correctly, but here is how the user normally
 uses the event logging functionality:

 After setting spark.eventLog.enabled and optionally
 spark.eventLog.dir, the user runs his/her Spark application and calls
 sc.stop() at the end of it. Then he/she goes to the standalone Master UI
 (under http://master-url:8080 by default) and click on the application
 under the Completed Applications table. This will link to the Spark UI of
 the finished application in its completed state, under a path that looks
 like http://master-url:8080/history/app-Id. It won't be on 
 http://localhost:4040; anymore because the port is now freed for new
 applications to bind their SparkUIs to. To access the file that stores the
 raw statistics, go to the file specified in spark.eventLog.dir. This is
 by default /tmp/spark-events, though in Spark 1.0.1 it may be in HDFS
 under the same path.

 I could be misunderstanding what you mean by the stats being buried in the
 console output, because the events are not logged to the console but to a
 file in spark.eventLog.dir. For all of this to work, of course, you have
 to run Spark in standalone mode (i.e. with master set to
 spark://master-url:7077). In other modes, you will need to use the
 history server instead.

 Does this make sense?
 Andrew


 2014-08-14 18:08 GMT-07:00 SK skrishna...@gmail.com:

 More specifically, as indicated by Patrick above, in 1.0+, apps will have
 persistent state so that the UI can be reloaded. Is there a way to enable
 this feature in 1.0.1?

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12157.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





SPARK_DRIVER_MEMORY

2014-08-14 Thread Brad Miller
Hi All,

I have a Spark job for which I need to increase the amount of memory
allocated to the driver to collect a large-ish (200M) data structure.
Formerly, I accomplished this by setting SPARK_MEM before invoking my
job (which effectively set memory on the driver) and then setting
spark.executor.memory before creating my spark context.  This was a
bit awkward since it wasn't clear exactly what SPARK_MEM was meant to
do (although in practice it affected only the driver).

Since the release of 1.0.0, I've started receiving messages saying to
set spark.executor.memory or SPARK_DRIVER_MEMORY.  This definitely
helps clear things up, but still feels a bit awkward since it seems
that most configuration can now be done from within the program
(indeed there are very few environment variables now listed on the
Spark configuration page).  Furthermore, SPARK_DRIVER_MEMORY doesn't
seem to appear anywhere in the web documentation.

Is there a better way to set SPARK_DRIVER_MEMORY, or some
documentation that I'm missing?

Is there a guiding principle that would help in figuring out which
configuration parameters are set through environment variables and
which are set programmatically, or somewhere to look in the source for
an exhaustive list of environment variable configuration options?

best,
-Brad

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SPARK_LOCAL_DIRS

2014-08-14 Thread Brad Miller
Hi All,

I'm having some trouble setting the disk spill directory for spark.  The
following approaches set spark.local.dir (according to the Environment
tab of the web UI) but produce the indicated warnings:

*In spark-env.sh:*
export SPARK_JAVA_OPTS=-Dspark.local.dir=/spark/spill
*Associated warning:*
14/08/14 10:10:39 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
14/08/14 10:10:39 WARN SparkConf:
SPARK_JAVA_OPTS was detected (set to '-Dspark.local.dir=/spark/spill').
This is deprecated in Spark 1.0+.
Please instead use...

*In spark-defaults.conf:*
spark.local.dir  /spark/spill
*Associated warning:*
14/08/14 10:09:04 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).

The following does not produce any warnings, but also produces no sign of
actually setting spark.local.dir:

*In spark-env.sh:*
export SPARK_LOCAL_DIRS=/spark/spill

Does anybody know whether SPARK_LOCAL_DIRS actually works as advertised, or
if I am perhaps using it incorrectly?

best,
-Brad


trouble with saveAsParquetFile

2014-08-07 Thread Brad Miller
Hi All,

I'm having a bit of trouble with nested data structures in pyspark with
saveAsParquetFile.  I'm running master (as of yesterday) with this pull
request added: https://github.com/apache/spark/pull/1802.

*# these all work*
 sqlCtx.jsonRDD(sc.parallelize(['{record:
null}'])).saveAsParquetFile('/tmp/test0')
 sqlCtx.jsonRDD(sc.parallelize(['{record:
[]}'])).saveAsParquetFile('/tmp/test1')
 sqlCtx.jsonRDD(sc.parallelize(['{record: {children:
null}}'])).saveAsParquetFile('/tmp/test2')
 sqlCtx.jsonRDD(sc.parallelize(['{record: {children:
[]}}'])).saveAsParquetFile('/tmp/test3')
 sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: foobar}]*
}'])).saveAsParquetFile('/tmp/test4')

*# this FAILS*
 sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: null}]*
}'])).saveAsParquetFile('/tmp/test5')
Py4JJavaError: An error occurred while calling o706.saveAsParquetFile.
: java.lang.RuntimeException: *Unsupported datatype NullType*

*# this FAILS*
 sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: []}]*
}'])).saveAsParquetFile('/tmp/test6')
Py4JJavaError: An error occurred while calling o719.saveAsParquetFile.
: java.lang.RuntimeException: *Unsupported datatype NullType*

Based on the documentation and the examples that work, it seems like the
failing examples are probably meant to be supported features.  I was unable
to find an open issue for this.  Does anybody know if there is an open
issue, or whether an issue should be created?

best,
-Brad


Re: trouble with saveAsParquetFile

2014-08-07 Thread Brad Miller
Thanks Yin!

best,
-Brad


On Thu, Aug 7, 2014 at 1:39 PM, Yin Huai yh...@databricks.com wrote:

 Hi Brad,

 It is a bug. I have filed https://issues.apache.org/jira/browse/SPARK-2908
 to track it. It will be fixed soon.

 Thanks,

 Yin


 On Thu, Aug 7, 2014 at 10:55 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I'm having a bit of trouble with nested data structures in pyspark with
 saveAsParquetFile.  I'm running master (as of yesterday) with this pull
 request added: https://github.com/apache/spark/pull/1802.

 *# these all work*
  sqlCtx.jsonRDD(sc.parallelize(['{record:
 null}'])).saveAsParquetFile('/tmp/test0')
  sqlCtx.jsonRDD(sc.parallelize(['{record:
 []}'])).saveAsParquetFile('/tmp/test1')
  sqlCtx.jsonRDD(sc.parallelize(['{record: {children:
 null}}'])).saveAsParquetFile('/tmp/test2')
  sqlCtx.jsonRDD(sc.parallelize(['{record: {children:
 []}}'])).saveAsParquetFile('/tmp/test3')
  sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: foobar}]*
 }'])).saveAsParquetFile('/tmp/test4')

 *# this FAILS*
  sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: null}]*
 }'])).saveAsParquetFile('/tmp/test5')
 Py4JJavaError: An error occurred while calling o706.saveAsParquetFile.
 : java.lang.RuntimeException: *Unsupported datatype NullType*

 *# this FAILS*
  sqlCtx.jsonRDD(sc.parallelize(['{record: *[{children: []}]*
 }'])).saveAsParquetFile('/tmp/test6')
 Py4JJavaError: An error occurred while calling o719.saveAsParquetFile.
 : java.lang.RuntimeException: *Unsupported datatype NullType*

 Based on the documentation and the examples that work, it seems like the
 failing examples are probably meant to be supported features.  I was unable
 to find an open issue for this.  Does anybody know if there is an open
 issue, or whether an issue should be created?

 best,
 -Brad





pyspark inferSchema

2014-08-05 Thread Brad Miller
Hi All,

I have a data set where each record is serialized using JSON, and I'm
interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
a snag since some fields in the data are maps and list, and are not
guaranteed to be populated for each record.  This seems to cause
inferSchema to throw an error:

Produces error:
srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
{'foo':'boom', 'baz':[1,2,3]}]))

Works fine:
srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
{'foo':'boom', 'baz':[]}]))

To be fair inferSchema says it peeks at the first row, so a possible
work-around would be to make sure the type of any collection can be
determined using the first instance.  However, I don't believe that items
in an RDD are guaranteed to remain in an ordered, so this approach seems
somewhat brittle.

Does anybody know a robust solution to this problem in PySpark?  I'm am
running the 1.0.1 release.

-Brad


Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Hi Nick,

Thanks for the great response.

I actually already investigated jsonRDD and jsonFile, although I did not
realize they provide more complete schema inference.  I did however have
other problems with jsonRDD and jsonFile, but I will now describe in a
separate thread with an appropriate subject.

I did notice that when I run your example code, I do not receive the exact
same output.  For example, I see:

 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
'{foo:boom, baz:[1,2,3]}']))
 srdd.printSchema()

root
 |-- baz: ArrayType[IntegerType]
 |-- foo: StringType


Notice the difference in the schema.  Are you running the 1.0.1 release, or
a more bleeding-edge version from the repository?

best,
-Brad


On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 I was just about to ask about this.

 Currently, there are two methods, sqlContext.jsonFile() and
 sqlContext.jsonRDD(), that work on JSON text and infer a schema that
 covers the whole data set.

 For example:

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)
  a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}', 
  '{foo:boom, baz:[1,2,3]}'])) a.printSchema()
 root
  |-- baz: array (nullable = true)
  ||-- element: integer (containsNull = false)
  |-- foo: string (nullable = true)

 It works really well! It handles fields with inconsistent value types by
 inferring a value type that covers all the possible values.

 But say you’ve already deserialized the JSON to do some pre-processing or
 filtering. You’d commonly want to do this, say, to remove bad data. So now
 you have an RDD of Python dictionaries, as opposed to an RDD of JSON
 strings. It would be perfect if you could get the completeness of the
 json...() methods, but against dictionaries.

 Unfortunately, as you noted, inferSchema() only looks at the first
 element in the set. Furthermore, inferring schemata from RDDs of
 dictionaries is being deprecated
 https://issues.apache.org/jira/browse/SPARK-2010 in favor of doing so
 from RDDs of Rows.

 I’m not sure what the intention behind this move is, but as a user I’d
 like to be able to convert RDDs of dictionaries directly to SchemaRDDs with
 the completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really want that, I have to serialize the dictionaries to JSON text and
 then call jsonRDD(), which is expensive.

 Nick
 ​


 On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I have a data set where each record is serialized using JSON, and I'm
 interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
 a snag since some fields in the data are maps and list, and are not
 guaranteed to be populated for each record.  This seems to cause
 inferSchema to throw an error:

 Produces error:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
 {'foo':'boom', 'baz':[1,2,3]}]))

 Works fine:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
 {'foo':'boom', 'baz':[]}]))

 To be fair inferSchema says it peeks at the first row, so a possible
 work-around would be to make sure the type of any collection can be
 determined using the first instance.  However, I don't believe that items
 in an RDD are guaranteed to remain in an ordered, so this approach seems
 somewhat brittle.

 Does anybody know a robust solution to this problem in PySpark?  I'm am
 running the 1.0.1 release.

 -Brad





trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Hi All,

I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
some JSON data I have, but I've run into some instability involving the
following java exception:

An error occurred while calling o1326.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
on host neal.research.intel-research.net:
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments

I've pasted code which produces the error as well as the full traceback
below.  Note that I don't have any problem when I parse the JSON myself and
use inferSchema.

Is anybody able to reproduce this bug?

-Brad

 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
'{foo:boom, baz:[1,2,3]}']))
 srdd.printSchema()

root
 |-- baz: ArrayType[IntegerType]
 |-- foo: StringType

 srdd.collect()

---
Py4JJavaError Traceback (most recent call last)
ipython-input-89-ec7e8e8c68c4 in module()
 1 srdd.collect()

/home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self)
581 
582 with _JavaStackTrace(self.context) as st:
-- 583   bytesInJava = self._jrdd.collect().iterator()
584 return
list(self._collect_iterator_through_file(bytesInJava))
585

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
__call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
-- 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o1326.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
on host neal.research.intel-research.net:
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments
net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.dump(Pickler.java:95)
net.razorvine.pickle.Pickler.dumps(Pickler.java:80)

org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)

org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
scala.collection.Iterator$anon$11.next(Iterator.scala:328)

org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)

org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at

Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Got it.  Thanks!


On Tue, Aug 5, 2014 at 11:53 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Notice the difference in the schema.  Are you running the 1.0.1 release,
 or a more bleeding-edge version from the repository?

 Yep, my bad. I’m running off master at commit
 184048f80b6fa160c89d5bb47b937a0a89534a95.

 Nick
 ​



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Nick: Thanks for both the original JIRA bug report and the link.

Michael: This is on the 1.0.1 release.  I'll update to master and follow-up
if I have any problems.

best,
-Brad


On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com
wrote:

 Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC which
 should be coming out this week.  Pyspark did not have good support for
 nested data previously.  If you still encounter issues using a more recent
 version, please file a JIRA.  Thanks!


 On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
 some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()


 ---
 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 181.0:29 failed 4 times, most recent failure: Exception failure in TID 1664
 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments
 net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.dump(Pickler.java:95)
 net.razorvine.pickle.Pickler.dumps(Pickler.java:80)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)

 org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
 scala.collection.Iterator$anon$11.next(Iterator.scala:328)

 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)

 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028

Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Hi Davies,

Thanks for the response and tips.  Is the sample argument to inferSchema
available in the 1.0.1 release of pyspark?  I'm not sure (based on the
documentation linked below) that it is.
http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema

It sounds like updating to master may help address my issue (and may also
make the sample argument available), so I'm going to go ahead and do that.

best,
-Brad


On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote:

 On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  I was just about to ask about this.
 
  Currently, there are two methods, sqlContext.jsonFile() and
  sqlContext.jsonRDD(), that work on JSON text and infer a schema that
 covers
  the whole data set.
 
  For example:
 
  from pyspark.sql import SQLContext
  sqlContext = SQLContext(sc)
 
  a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
  '{foo:boom, baz:[1,2,3]}']))
  a.printSchema()
  root
   |-- baz: array (nullable = true)
   ||-- element: integer (containsNull = false)
   |-- foo: string (nullable = true)
 
  It works really well! It handles fields with inconsistent value types by
  inferring a value type that covers all the possible values.
 
  But say you’ve already deserialized the JSON to do some pre-processing or
  filtering. You’d commonly want to do this, say, to remove bad data. So
 now
  you have an RDD of Python dictionaries, as opposed to an RDD of JSON
  strings. It would be perfect if you could get the completeness of the
  json...() methods, but against dictionaries.
 
  Unfortunately, as you noted, inferSchema() only looks at the first
 element
  in the set. Furthermore, inferring schemata from RDDs of dictionaries is
  being deprecated in favor of doing so from RDDs of Rows.
 
  I’m not sure what the intention behind this move is, but as a user I’d
 like
  to be able to convert RDDs of dictionaries directly to SchemaRDDs with
 the
  completeness of the jsonRDD()/jsonFile() methods. Right now if I really
 want
  that, I have to serialize the dictionaries to JSON text and then call
  jsonRDD(), which is expensive.

 Before upcoming 1.1 release, we did not support nested structures via
 inferSchema,
 the nested dictionary will be MapType. This introduces inconsistance
 for dictionary that
 the top level will be structure type (can be accessed by name of
 field) but others will be
 MapType (can be accesses as map).

 So deprecated top level dictionary is try to solve this kind of
 inconsistance.

 The Row class in pyspark.sql has a similar interface to dict, so you
 can easily convert
 you dic into a Row:

 ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

 In order to get the correct schema, so we need another argument to specify
 the number of rows to be infered? Such as:

 inferSchema(rdd, sample=None)

 with sample=None, it will take the first row, or it will do the
 sampling to figure out the
 complete schema.

 Does this work for you?

  Nick
 
 
 
  On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:
 
  Hi All,
 
  I have a data set where each record is serialized using JSON, and I'm
  interested to use SchemaRDDs to work with the data.  Unfortunately I've
 hit
  a snag since some fields in the data are maps and list, and are not
  guaranteed to be populated for each record.  This seems to cause
 inferSchema
  to throw an error:
 
  Produces error:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
  {'foo':'boom', 'baz':[1,2,3]}]))
 
  Works fine:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
  {'foo':'boom', 'baz':[]}]))
 
  To be fair inferSchema says it peeks at the first row, so a possible
  work-around would be to make sure the type of any collection can be
  determined using the first instance.  However, I don't believe that
 items in
  an RDD are guaranteed to remain in an ordered, so this approach seems
  somewhat brittle.
 
  Does anybody know a robust solution to this problem in PySpark?  I'm am
  running the 1.0.1 release.
 
  -Brad
 
 



Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
Assuming updating to master fixes the bug I was experiencing with jsonRDD
and jsonFile, then pushing sample to master will probably not be
necessary.

We believe that the link below was the bug I experienced, and I've been
told it is fixed in master.

https://issues.apache.org/jira/browse/SPARK-2376

best,
-brad


On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com wrote:

 This sample argument of inferSchema is still no in master, if will
 try to add it if it make
 sense.

 On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi Davies,
 
  Thanks for the response and tips.  Is the sample argument to
 inferSchema
  available in the 1.0.1 release of pyspark?  I'm not sure (based on the
  documentation linked below) that it is.
 
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema
 
  It sounds like updating to master may help address my issue (and may also
  make the sample argument available), so I'm going to go ahead and do
 that.
 
  best,
  -Brad
 
 
  On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com
 wrote:
 
  On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
   I was just about to ask about this.
  
   Currently, there are two methods, sqlContext.jsonFile() and
   sqlContext.jsonRDD(), that work on JSON text and infer a schema that
   covers
   the whole data set.
  
   For example:
  
   from pyspark.sql import SQLContext
   sqlContext = SQLContext(sc)
  
   a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
   '{foo:boom, baz:[1,2,3]}']))
   a.printSchema()
   root
|-- baz: array (nullable = true)
||-- element: integer (containsNull = false)
|-- foo: string (nullable = true)
  
   It works really well! It handles fields with inconsistent value types
 by
   inferring a value type that covers all the possible values.
  
   But say you’ve already deserialized the JSON to do some pre-processing
   or
   filtering. You’d commonly want to do this, say, to remove bad data. So
   now
   you have an RDD of Python dictionaries, as opposed to an RDD of JSON
   strings. It would be perfect if you could get the completeness of the
   json...() methods, but against dictionaries.
  
   Unfortunately, as you noted, inferSchema() only looks at the first
   element
   in the set. Furthermore, inferring schemata from RDDs of dictionaries
 is
   being deprecated in favor of doing so from RDDs of Rows.
  
   I’m not sure what the intention behind this move is, but as a user I’d
   like
   to be able to convert RDDs of dictionaries directly to SchemaRDDs with
   the
   completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really
   want
   that, I have to serialize the dictionaries to JSON text and then call
   jsonRDD(), which is expensive.
 
  Before upcoming 1.1 release, we did not support nested structures via
  inferSchema,
  the nested dictionary will be MapType. This introduces inconsistance
  for dictionary that
  the top level will be structure type (can be accessed by name of
  field) but others will be
  MapType (can be accesses as map).
 
  So deprecated top level dictionary is try to solve this kind of
  inconsistance.
 
  The Row class in pyspark.sql has a similar interface to dict, so you
  can easily convert
  you dic into a Row:
 
  ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))
 
  In order to get the correct schema, so we need another argument to
 specify
  the number of rows to be infered? Such as:
 
  inferSchema(rdd, sample=None)
 
  with sample=None, it will take the first row, or it will do the
  sampling to figure out the
  complete schema.
 
  Does this work for you?
 
   Nick
  
  
  
   On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller 
 bmill...@eecs.berkeley.edu
   wrote:
  
   Hi All,
  
   I have a data set where each record is serialized using JSON, and I'm
   interested to use SchemaRDDs to work with the data.  Unfortunately
 I've
   hit
   a snag since some fields in the data are maps and list, and are not
   guaranteed to be populated for each record.  This seems to cause
   inferSchema
   to throw an error:
  
   Produces error:
   srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
   {'foo':'boom', 'baz':[1,2,3]}]))
  
   Works fine:
   srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar',
 'baz':[1,2,3]},
   {'foo':'boom', 'baz':[]}]))
  
   To be fair inferSchema says it peeks at the first row, so a
 possible
   work-around would be to make sure the type of any collection can be
   determined using the first instance.  However, I don't believe that
   items in
   an RDD are guaranteed to remain in an ordered, so this approach seems
   somewhat brittle.
  
   Does anybody know a robust solution to this problem in PySpark?  I'm
 am
   running the 1.0.1 release.
  
   -Brad
  
  
 
 



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Hi All,

I've built and deployed the current head of branch-1.0, but it seems to
have only partly fixed the bug.

This code now runs as expected with the indicated output:
 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[1,2,3]}',
'{foo:[4,5,6]}']))
 srdd.printSchema()
root
 |-- foo: ArrayType[IntegerType]
 srdd.collect()
[{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}]

This code still crashes:
 srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
'{foo:[[1,2,3], [4,5,6]]}']))
 srdd.printSchema()
root
 |-- foo: ArrayType[ArrayType(IntegerType)]
 srdd.collect()
Py4JJavaError: An error occurred while calling o63.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
3.0:29 failed 4 times, most recent failure: Exception failure in TID 67 on
host kunitz.research.intel-research.net:
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments

I may be able to see if this is fixed in master, but since it's not fixed
in 1.0.3 it seems unlikely to be fixed in master either. I previously tried
master as well, but ran into a build problem that did not occur with the
1.0 branch.

Can anybody else verify that the second example still crashes (and is meant
to work)? If so, would it be best to modify JIRA-2376 or start a new bug?
https://issues.apache.org/jira/browse/SPARK-2376

best,
-Brad





On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Nick: Thanks for both the original JIRA bug report and the link.

 Michael: This is on the 1.0.1 release.  I'll update to master and
 follow-up if I have any problems.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC which
 should be coming out this week.  Pyspark did not have good support for
 nested data previously.  If you still encounter issues using a more recent
 version, please file a JIRA.  Thanks!


 On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I am interested to use jsonRDD and jsonFile to create a SchemaRDD out of
 some JSON data I have, but I've run into some instability involving the
 following java exception:

 An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID
 1664 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments

 I've pasted code which produces the error as well as the full traceback
 below.  Note that I don't have any problem when I parse the JSON myself and
 use inferSchema.

 Is anybody able to reproduce this bug?

 -Brad

  srdd = sqlCtx.jsonRDD(sc.parallelize(['{foo:bar, baz:[1,2,3]}',
 '{foo:boom, baz:[1,2,3]}']))
  srdd.printSchema()

 root
  |-- baz: ArrayType[IntegerType]
  |-- foo: StringType

  srdd.collect()


 ---
 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-89-ec7e8e8c68c4 in module()
  1 srdd.collect()

 /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in
 collect(self)
 581 
 582 with _JavaStackTrace(self.context) as st:
 -- 583   bytesInJava = self._jrdd.collect().iterator()
 584 return
 list(self._collect_iterator_through_file(bytesInJava))
 585

 /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
 __call__(self, *args)
 535 answer = self.gateway_client.send_command(command)
 536 return_value = get_return_value(answer,
 self.gateway_client,
 -- 537 self.target_id, self.name)
 538
 539 for temp_arg in temp_args:

 /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o1326.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 181.0:29 failed 4 times, most recent failure: Exception failure in TID
 1664 on host neal.research.intel-research.net:
 net.razorvine.pickle.PickleException: couldn't introspect javabean:
 java.lang.IllegalArgumentException: wrong number of arguments
 net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
 net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
 net.razorvine.pickle.Pickler.save(Pickler.java:125)
 net.razorvine.pickle.Pickler.put_map(Pickler.java:322

Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
Hi All,

I checked out and built master.  Note that Maven had a problem building
Kafka (in my case, at least); I was unable to fix this easily so I moved on
since it seemed unlikely to have any influence on the problem at hand.

Master improves functionality (including the example Nicholas just
demonstrated) but unfortunately there still seems to be a bug related to
using dictionaries as values.  I've put some code below to illustrate the
bug.

*# dictionary as value works fine*
 print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
value}}'])).collect()
[Row(key0=Row(key1=u'value'))]

*# dictionary as value works fine, even when inner keys are varied*
 print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
'{key0: {key2: value2}}'])).collect()
[Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
key2=u'value2'))]

*# dictionary as value works fine when inner keys are missing and outer key
is present*
 print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
value1}}'])).collect()
[Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

*# dictionary as value FAILS when outer key is missing*
* print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
value1}}'])).collect()*
Py4JJavaError: An error occurred while calling o84.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
stage 7.0 (TID 242, engelland.research.intel-research.net):
java.lang.NullPointerException...

*# dictionary as value FAILS when outer key is present with null value*
* print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
{key1: value1}}'])).collect()*
Py4JJavaError: An error occurred while calling o98.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
stage 9.0 (TID 305, kunitz.research.intel-research.net):
java.lang.NullPointerException...

*# nested lists work even when outer key is missing*
 print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0, item1],
[item2, item3]]}'])).collect()
[Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

Is anyone able to replicate this behavior?

-Brad




On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point its
 likely the same as master, but that will change as features start getting
 added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine,
 but then it crashes on collect().

 In general, should I expect the master (which seems to be on branch-1.1)
 to be any more/less stable than branch-1.0?  While it would be great to
 have this fixed, it would be good to know if I should expect lots of other
 instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], 
  [4,5,6]]}'





 ])
 ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}']))
 MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6]]}'])).printSchema()
 root
  |-- foo: array (nullable = true)
  ||-- element: array (containsNull = false)
  |||-- element: integer (containsNull = false)

 

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:

 Hi All,

 I've built and deployed the current head of branch-1.0, but it seems
 to have only partly fixed the bug.

 This code now runs as expected with the indicated output:
  srdd

Re: pyspark inferSchema

2014-08-05 Thread Brad Miller
I've followed up in a thread more directly related to jsonRDD and jsonFile,
but it seems like after building from the current master I'm still having
some problems with nested dictionaries.

http://apache-spark-user-list.1001560.n3.nabble.com/trouble-with-jsonRDD-and-jsonFile-in-pyspark-tp11461p11517.html


On Tue, Aug 5, 2014 at 12:56 PM, Yin Huai yh...@databricks.com wrote:

 Yes, 2376 has been fixed in master. Can you give it a try?

 Also, for inferSchema, because Python is dynamically typed, I agree with
 Davies to provide a way to scan a subset (or entire) of the dataset to
 figure out the proper schema. We will take a look it.

 Thanks,

 Yin


 On Tue, Aug 5, 2014 at 12:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Assuming updating to master fixes the bug I was experiencing with jsonRDD
 and jsonFile, then pushing sample to master will probably not be
 necessary.

 We believe that the link below was the bug I experienced, and I've been
 told it is fixed in master.

 https://issues.apache.org/jira/browse/SPARK-2376

 best,
 -brad


 On Tue, Aug 5, 2014 at 12:18 PM, Davies Liu dav...@databricks.com
 wrote:

 This sample argument of inferSchema is still no in master, if will
 try to add it if it make
 sense.

 On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi Davies,
 
  Thanks for the response and tips.  Is the sample argument to
 inferSchema
  available in the 1.0.1 release of pyspark?  I'm not sure (based on the
  documentation linked below) that it is.
 
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema
 
  It sounds like updating to master may help address my issue (and may
 also
  make the sample argument available), so I'm going to go ahead and do
 that.
 
  best,
  -Brad
 
 
  On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com
 wrote:
 
  On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
   I was just about to ask about this.
  
   Currently, there are two methods, sqlContext.jsonFile() and
   sqlContext.jsonRDD(), that work on JSON text and infer a schema that
   covers
   the whole data set.
  
   For example:
  
   from pyspark.sql import SQLContext
   sqlContext = SQLContext(sc)
  
   a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
   '{foo:boom, baz:[1,2,3]}']))
   a.printSchema()
   root
|-- baz: array (nullable = true)
||-- element: integer (containsNull = false)
|-- foo: string (nullable = true)
  
   It works really well! It handles fields with inconsistent value
 types by
   inferring a value type that covers all the possible values.
  
   But say you’ve already deserialized the JSON to do some
 pre-processing
   or
   filtering. You’d commonly want to do this, say, to remove bad data.
 So
   now
   you have an RDD of Python dictionaries, as opposed to an RDD of JSON
   strings. It would be perfect if you could get the completeness of
 the
   json...() methods, but against dictionaries.
  
   Unfortunately, as you noted, inferSchema() only looks at the first
   element
   in the set. Furthermore, inferring schemata from RDDs of
 dictionaries is
   being deprecated in favor of doing so from RDDs of Rows.
  
   I’m not sure what the intention behind this move is, but as a user
 I’d
   like
   to be able to convert RDDs of dictionaries directly to SchemaRDDs
 with
   the
   completeness of the jsonRDD()/jsonFile() methods. Right now if I
 really
   want
   that, I have to serialize the dictionaries to JSON text and then
 call
   jsonRDD(), which is expensive.
 
  Before upcoming 1.1 release, we did not support nested structures via
  inferSchema,
  the nested dictionary will be MapType. This introduces inconsistance
  for dictionary that
  the top level will be structure type (can be accessed by name of
  field) but others will be
  MapType (can be accesses as map).
 
  So deprecated top level dictionary is try to solve this kind of
  inconsistance.
 
  The Row class in pyspark.sql has a similar interface to dict, so you
  can easily convert
  you dic into a Row:
 
  ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))
 
  In order to get the correct schema, so we need another argument to
 specify
  the number of rows to be infered? Such as:
 
  inferSchema(rdd, sample=None)
 
  with sample=None, it will take the first row, or it will do the
  sampling to figure out the
  complete schema.
 
  Does this work for you?
 
   Nick
  
  
  
   On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller 
 bmill...@eecs.berkeley.edu
   wrote:
  
   Hi All,
  
   I have a data set where each record is serialized using JSON, and
 I'm
   interested to use SchemaRDDs to work with the data.  Unfortunately
 I've
   hit
   a snag since some fields in the data are maps and list, and are not
   guaranteed to be populated for each record.  This seems to cause
   inferSchema
   to throw an error:
  
   Produces error:
   srdd = sqlCtx.inferSchema

Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-05 Thread Brad Miller
I concur that printSchema works; it just seems to be operations that use
the data where trouble happens.

Thanks for posting the bug.

-Brad


On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote:

 I tried jsonRDD(...).printSchema() and it worked. Seems the problem is
 when we take the data back to the Python side, SchemaRDD#javaToPython
 failed on your cases. I have created
 https://issues.apache.org/jira/browse/SPARK-2875 to track it.

 Thanks,

 Yin


 On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I checked out and built master.  Note that Maven had a problem building
 Kafka (in my case, at least); I was unable to fix this easily so I moved on
 since it seemed unlikely to have any influence on the problem at hand.

 Master improves functionality (including the example Nicholas just
 demonstrated) but unfortunately there still seems to be a bug related to
 using dictionaries as values.  I've put some code below to illustrate the
 bug.

 *# dictionary as value works fine*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
 value}}'])).collect()
 [Row(key0=Row(key1=u'value'))]

 *# dictionary as value works fine, even when inner keys are varied*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
 '{key0: {key2: value2}}'])).collect()
 [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
 key2=u'value2'))]

 *# dictionary as value works fine when inner keys are missing and outer
 key is present*
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
 value1}}'])).collect()
 [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

 *# dictionary as value FAILS when outer key is missing*
 * print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
 value1}}'])).collect()*
 Py4JJavaError: An error occurred while calling o84.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage 7.0 (TID 242, engelland.research.intel-research.net):
 java.lang.NullPointerException...

 *# dictionary as value FAILS when outer key is present with null value*
 * print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
 {key1: value1}}'])).collect()*
 Py4JJavaError: An error occurred while calling o98.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
 stage 9.0 (TID 305, kunitz.research.intel-research.net):
 java.lang.NullPointerException...

 *# nested lists work even when outer key is missing*
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0,
 item1], [item2, item3]]}'])).collect()
 [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

 Is anyone able to replicate this behavior?

  -Brad




 On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point
 its likely the same as master, but that will change as features start
 getting added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick
 ​


 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine,
 but then it crashes on collect().

 In general, should I expect the master (which seems to be on
 branch-1.1) to be any more/less stable than branch-1.0?  While it would be
 great to have this fixed, it would be good to know if I should expect lots
 of other instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3], 
  [4,5,6]]}'







 ])
 ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 
 sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', 
 '{foo:[[1,2,3], [4,5,6

Re: Announcing Spark 1.0.1

2014-07-12 Thread Brad Miller
Hi All,

Congrats to the entire Spark team on the 1.0.1 release.

In checking out the new features, I noticed that it looks like the
python API docs have been updated, but the title and the header at
the top of the page still say Spark 1.0.0.  Clearly not a big
deal... I just wouldn't want anyone to get confused and miss out.

http://spark.apache.org/docs/1.0.1/api/python/index.html

best,
-Brad

On Fri, Jul 11, 2014 at 8:44 PM, Henry Saputra henry.sapu...@gmail.com wrote:
 Congrats to the Spark community !

 On Friday, July 11, 2014, Patrick Wendell pwend...@gmail.com wrote:

 I am happy to announce the availability of Spark 1.0.1! This release
 includes contributions from 70 developers. Spark 1.0.0 includes fixes
 across several areas of Spark, including the core API, PySpark, and
 MLlib. It also includes new features in Spark's (alpha) SQL library,
 including support for JSON data and performance and stability fixes.

 Visit the release notes[1] to read about this release or download[2]
 the release today.

 [1] http://spark.apache.org/releases/spark-release-1-0-1.html
 [2] http://spark.apache.org/downloads.html


odd caching behavior or accounting

2014-06-30 Thread Brad Miller
Hi All,

I am resending this message because I suspect the original may have been
blocked from the mailing list due to attachments.  Note that the mail does
appear on the apache archives
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3CCANR-kKeO3mxL1QuX0fnz0DEPkU4FFbXO2W_5CdmtrzYKUfhaBg%40mail.gmail.com%3E
but
not on nabble, the online archive linked from the Spark website
http://apache-spark-user-list.1001560.n3.nabble.com/.

The text of the original message appears below; the PDF
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/raw/%3ccanr-kkeo3mxl1qux0fnz0depku4ffbxo2w_5cdmtrzykufh...@mail.gmail.com%3e/2
 and PNG
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/raw/%3ccanr-kkeo3mxl1qux0fnz0depku4ffbxo2w_5cdmtrzykufh...@mail.gmail.com%3e/3
files
original attached are now available as linked from the apache archive.

best,
-Brad


-- Forwarded message --
From: Brad Miller bmill...@eecs.berkeley.edu
Date: Mon, Jun 30, 2014 at 10:20 AM
Subject: odd caching behavior or accounting
To: user@spark.apache.org


Hi All,

I've recently noticed some caching behavior which I did not understand
and may or may not have indicated a bug.  In short, the web UI seemed
to indicate that some blocks were being added to the cache despite
already being in cache.

As documentation, I have attached two UI screenshots.  The PNG
captures enough of the screen to demonstrate the problem; the PDF is
the printout of the full page.  Notice that:

-block rdd_21_1001 is in the cache twice, both times on
letang.research.intel-research.net; many other blocks also occur twice
on a variety of hosts.  I've not confirmed that the duplicate block is
*always* the same host but it seems to appear that way.

-the stated storage level is Memory Deserialized 1x Replicated

-the top left states that the cached partitions and total
partitions are 4000, but in the table where partitions are enumerated
there are 4534.

Although not reflected in this screenshot, I believe I have seen this
behavior occur even when double caching of blocks causes eviction of
blocks from other RDDs.  I am running the Spark 1.0.0 release and
using pyspark.

best,
-Brad


pyspark bug with unittest and scikit-learn

2014-06-19 Thread Brad Miller
Hi All,

I am attempting to develop some unit tests for a program using pyspark and
scikit-learn and I've come across some weird behavior.  I receive the
following warning during some tests python/pyspark/serializers.py:327:
DeprecationWarning: integer argument expected, got float.

Although it's only a warning, and my test still passes (i.e. Spark still
seems to work), it would be nice to know why it's happening and if it
actually indicates a problem since this can probably happen outside unit
testing as well.

Note that the warning occurs when I invoke the test as
SPARK_HOME=/home/spark/spark-1.0.0-bin-hadoop1
PYTHONPATH=/home/spark/spark-1.0.0-bin-hadoop1/python python -m unittest -v
-b crash_test.  Doing any one of the following three things causes the
warning to go away:

-invoking as python crash_test.py rather than python -m unittest -v -b
crash_test
-commenting out import sklearn.metrics
-changing lambda x: foo(x) to lambda x: x

Note that I am running the following software:
Spark 1.0.0
Python 2.7.3
scikit-learn 0.14.1
Ubuntu 12.04

*Exact Warning (actually occurs 3 times):*
/home/spark/spark-1.0.0-bin-hadoop1/python/pyspark/serializers.py:327:
DeprecationWarning: integer argument expected, got float
  stream.write(struct.pack(!q, value))
/home/spark/spark-1.0.0-bin-hadoop1/python/pyspark/serializers.py:327:
DeprecationWarning: integer argument expected, got float
  stream.write(struct.pack(!q, value))
/home/spark/spark-1.0.0-bin-hadoop1/python/pyspark/serializers.py:327:
DeprecationWarning: integer argument expected, got float
  stream.write(struct.pack(!q, value))

*crash_test.py:*
import unittest
from pyspark import SparkContext
import sklearn.metrics

def foo(x):
return x

def setUpModule():
global sc
sc = SparkContext('local')
print sc.parallelize(range(4)).map(lambda x: foo(x)).collect()

class CrashTest(unittest.TestCase):
def test(self):
pass

if __name__ == '__main__':
unittest.main()

I'm glad to know if anybody else has experienced a similar problem, or has
insight into what may be happening or if it is significant.

best,
-Brad


pyspark join crash

2014-06-04 Thread Brad Miller
Hi All,

I have experienced some crashing behavior with join in pyspark.  When I
attempt a join with 2000 partitions in the result, the join succeeds, but
when I use only 200 partitions in the result, the join fails with the
message Job aborted due to stage failure: Master removed our application:
FAILED.

The crash always occurs at the beginning of the shuffle phase.  Based on my
observations, it seems like the workers in the read phase may be fetching
entire blocks from the write phase of the shuffle rather than just the
records necessary to compose the partition the reader is responsible for.
 Hence, when there are fewer partitions in the read phase, the worker is
likely to need a record from each of the write partitions and consequently
attempts to load the entire data set into the memory of a single machine
(which then causes the out of memory crash I observe in /var/log/syslog).

Can anybody confirm if this is the behavior of pyspark?  I am glad to
supply additional details about my observed behavior upon request.

best,
-Brad


Re: pyspark join crash

2014-06-04 Thread Brad Miller
Hi Matei,

Thanks for the reply and creating the JIRA. I hear what you're saying,
although to be clear I want to still state that it seems like each reduce
task is loading significantly more data than just the records needed for
that task.  The workers seem to load all data from each block containing a
record needed by the reduce task.

I base this hypothesis on the following:
-My dataset is about 100G uncompressed, 22G serialized in memory with
compression enabled
-There are 130K records
-The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
-There are 3 cores per node (each running one reduce task at a time)
-Each node has 32G of memory

Note that I am attempting to join the dataset to itself and I ran this
experiment after caching the dataset in memory with serialization and
compression enabled.

Given these figures, even with only 200 partitions the average output
partition size (uncompressed) would be 1G (as the dataset is being joined
to itself, resulting in 200G over 200 partitions), requiring 3G from each
machine on average.  The behavior I observe is that the kernel kills jobs
in many of the nodes at nearly the exact same time right after the read
phase starts; it seems likely this would occur in each node except the
master begins detecting failures and stops the job (and I observe memory
spiking on all machines).  Indeed, I observe a large memory spike at each
node.

When I attempt the join with 2000 output partitions, it succeeds.  Note
that there are about 65 records per output partition on average, which
means the reader only needs to load input from about 130 blocks (as the
dataset is joined to itself).  Given that the average uncompressed block
size is 60M, even if the entire block were loaded (not just the relevant
record) we would expect about 23G of memory to be used per node on average.

I began suspecting the behavior of loading entire blocks based on the
logging from the workers (i.e.
BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty
blocks out of 3354 blocks).  If it is definitely not the case that entire
blocks are loaded from the writers, then it would seem like there is some
significant overhead which is chewing threw lots of memory (perhaps similar
to the problem with python broadcast variables chewing through memory
https://spark-project.atlassian.net/browse/SPARK-1065).

-Brad



On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 In PySpark, the data processed by each reduce task needs to fit in memory
 within the Python process, so you should use more tasks to process this
 dataset. Data is spilled to disk across tasks.

 I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track
 this — it’s something we’ve been meaning to look at soon.

 Matei

 On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

  Hi All,
 
  I have experienced some crashing behavior with join in pyspark.  When I
 attempt a join with 2000 partitions in the result, the join succeeds, but
 when I use only 200 partitions in the result, the join fails with the
 message Job aborted due to stage failure: Master removed our application:
 FAILED.
 
  The crash always occurs at the beginning of the shuffle phase.  Based on
 my observations, it seems like the workers in the read phase may be
 fetching entire blocks from the write phase of the shuffle rather than just
 the records necessary to compose the partition the reader is responsible
 for.  Hence, when there are fewer partitions in the read phase, the worker
 is likely to need a record from each of the write partitions and
 consequently attempts to load the entire data set into the memory of a
 single machine (which then causes the out of memory crash I observe in
 /var/log/syslog).
 
  Can anybody confirm if this is the behavior of pyspark?  I am glad to
 supply additional details about my observed behavior upon request.
 
  best,
  -Brad




Re: Spark - ready for prime time?

2014-04-10 Thread Brad Miller
I would echo much of what Andrew has said.

I manage a small/medium sized cluster (48 cores, 512G ram, 512G disk
space dedicated to spark, data storage in separate HDFS shares).  I've
been using spark since 0.7, and as with Andrew I've observed
significant and consistent improvements in stability (and in the
PySpark API) since then.  I have run into some trouble with mesos, and
I have run into some trouble when working with data which is large
relative to the size of my cluster (e.g. 100G), but overall it's
worked well and our group is continuing to build on top of spark.

On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote:
 The biggest issue I've come across is that the cluster is somewhat unstable
 when under memory pressure.  Meaning that if you attempt to persist an RDD
 that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
 OOMs.  I had to carefully modify some of the space tuning parameters and GC
 settings to get some jobs to even finish.

 The other issue I've observed is if you group on a key that is highly
 skewed, with a few massively-common keys and a long tail of rare keys, the
 one massive key can be too big for a single machine and again cause OOMs.

 I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.

 Just my personal experience, but I've observed significant improvements in
 stability since even the 0.7.x days, so I'm confident that things will
 continue to get better as long as people report what they're seeing so it
 can get fixed.

 Andrew


 On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.com
 wrote:

 I'll provide answers from our own experience at Bizo.  We've been using
 Spark for 1+ year now and have found it generally better than previous
 approaches (Hadoop + Hive mostly).



 On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth
 andras.nem...@lynxanalytics.com wrote:

 I. Is it too much magic? Lots of things just work right in Spark and
 it's extremely convenient and efficient when it indeed works. But should we
 be worried that customization is hard if the built in behavior is not quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?


 I think is goes back to understanding Spark's architecture, its design
 constraints and the problems it explicitly set out to address.   If the
 solution to your problems can be easily formulated in terms of the
 map/reduce model, then it's a good choice.  You'll want your
 customizations to go with (not against) the grain of the architecture.


 II. Is it mature enough? E.g. we've created a pull request which fixes a
 problem that we were very surprised no one ever stumbled upon before. So
 that's why I'm asking: is Spark being already used in professional settings?
 Can one already trust it being reasonably bug free and reliable?


 There are lots of ways to use Spark; and not all of the features are
 necessarily at the same level of maturity.   For instance, we put all the
 jars on the main classpath so we've never run into the issue your pull
 request addresses.

 We definitely use and rely on Spark on a professional basis.  We have 5+
 spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
 Once we got them working with the proper configuration settings, they have
 been running reliability since.

 I would characterize our use of Spark as a better Hadoop, in the sense
 that we use it for batch processing only, no streaming yet.   We're happy it
 performs better than Hadoop but we don't require/rely on its memory caching
 features.  In fact, for most of our jobs it would simplify our lives if
 Spark wouldn't cache so many things in memory since it would make
 configuration/tuning a lot simpler and jobs would run successfully on the
 first try instead of having to tweak things (# of partitions and such).

 So, to the concrete issues. Sorry for the long mail, and let me know if I
 should break this out into more threads or if there is some other way to
 have this discussion...

 1. Memory management
 The general direction of these questions is whether it's possible to take
 RDD caching related memory management more into our own hands as LRU
 eviction is nice most of the time but can be very suboptimal in some of our
 use cases.
 A. Somehow prioritize cached RDDs, E.g. mark some essential that one
 really wants to keep. I'm fine with going down in flames if I mark too much
 data essential.
 B. Memory reflection: can you pragmatically get the memory size of a
 cached rdd and memory sizes available in total/per executor? If we could do
 this we could indirectly avoid automatic evictions of things we might really
 want to keep in memory.
 C. Evictions caused by RDD partitions on the driver. I had a setup with
 huge worker memory and smallish memory on the driver JVM. To my surprise,
 the system started to cache RDD partitions on the driver as well. As the
 driver ran out of memory I 

Re: Spark - ready for prime time?

2014-04-10 Thread Brad Miller
 4. Shuffle on disk
 Is it true - I couldn't find it in official docs, but did see this mentioned
 in various threads - that shuffle _always_ hits disk? (Disregarding OS
 caches.) Why is this the case? Are you planning to add a function to do
 shuffle in memory or are there some intrinsic reasons for this to be
 impossible?

 I don't think it's true... as far as I'm concerned Spark doesn't peek into
 the OS and force it to disregard buffer caches. In general, for large
 shuffles, all shuffle files do not fit into memory, so we kind of have to
 write them out to disk. There is an undocumented option to sync writing
 shuffle files to disk every time we write a block, but that is by default
 false and not many people use it (for obvious reasons).

I believe I recently had the experience that for the map portion of
the shuffle all shuffle files seemed to be written into the file
system (albeit potentially on buffer caches).  The size of the shuffle
files on hosts matched the size of the shuffle write metric shown in
the UI (pyspark branch-0.9 as of Monday), so there didn't seem to be
any effort to keep the shuffle files in memory.

On Thu, Apr 10, 2014 at 12:43 PM, Andrew Or and...@databricks.com wrote:
 Here are answers to a subset of your questions:

 1. Memory management
 The general direction of these questions is whether it's possible to take
 RDD caching related memory management more into our own hands as LRU
 eviction is nice most of the time but can be very suboptimal in some of our
 use cases.
 A. Somehow prioritize cached RDDs, E.g. mark some essential that one
 really wants to keep. I'm fine with going down in flames if I mark too much
 data

 As far as I am aware, there is currently no other eviction policies for RDD
 blocks other than LRU. Your suggestion of prioritizing RDDs is an
 interesting one and I'm sure other users would like that as well.

 B. Memory reflection: can you pragmatically get the memory size of a
 cached rdd and memory sizes available in total/per executor? If we could do
 this we could indirectly avoid automatic evictions of things we might really
 want to keep in memory.

 All this information should be displayed on the UI under the Storage tab.

 C. Evictions caused by RDD partitions on the driver. I had a setup with
 huge worker memory and smallish memory on the driver JVM. To my surprise,
 the system started to cache RDD partitions on the driver as well. As the
 driver ran out of memory I started to see evictions while there were still 
 plenty of space on workers. This resulted in lengthy recomputations. Can
 this be avoided somehow?

 The amount of space used for RDD storage is only a fraction of the total
 amount of memory available to the JVM. More specifically, it is governed by
 `spark.storage.memoryFraction`, which is by default 60%. This may explain
 why evictions seem to occur pre-maturely sometimes. In the future, we should
 probably add a table that contains information about evicted RDDs on the UI,
 so it's easier to track them. Right now evicted RDD's disappear from the
 face of the planet completely, sometimes leaving the user somewhat
 confounded. Though with off-heap storage (Tachyon) this may become less
 relevant.

 D. Broadcasts. Is it possible to get rid of a broadcast manually, without
 waiting for the LRU eviction taking care of it? Can you tell the size of a
 broadcast programmatically?

 In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
 explicitly added! Under the storage tab of the UI, we could probably also
 have a Broadcast table in the future, seeing that there are users interested
 in this feature.


 3. Recalculation of cached rdds
 I see the following scenario happening. I load two RDDs A,B from disk,
 cache them and then do some jobs on them, at the very least a count on each.
 After these jobs are done I see on the storage panel that 100% of these RDDs
 are cached in memory.
 Then I create a third RDD C which is created by multiple joins and maps
 from A and B, also cache it and start a job on C. When I do this I still see
 A and B completely cached and also see C slowly getting more and more
 cached. This is all fine and good, but in the meanwhile I see stages running
 on the UI that point to code which is used to load A and B. How is this
 possible? Am I misunderstanding how cached RDDs should behave?
 And again the general question - how can one debug such issues?

 From the fractions of RDDs cached in memory, it seems to me that your
 application is running as expected. If you also cache C, then it will slowly
 add more blocks to storage, possibly evicting A and B if there is memory
 pressure. It's entirely possible that there is a bug on finding the call
 site on the stages page (there were a few PRs that made changes to this
 recently).

 4. Shuffle on disk
 Is it true - I couldn't find it in official docs, but did see this mentioned
 in various threads - that shuffle _always_ hits disk? (Disregarding OS
 

Re: trouble with join on large RDDs

2014-04-09 Thread Brad Miller
I set SPARK_MEM in the driver process by setting
spark.executor.memory to 10G.  Each machine had 32G of RAM and a
dedicated 32G spill volume.  I believe all of the units are in pages,
and the page size is the standard 4K.  There are 15 slave nodes in the
cluster and the sizes of the datasets I'm trying to join are about
2.5G and 25G when serialized and compressed in the RDD cache.

I appreciate that Python lacks the type of heap size controls
available in Java, but lack any concept of how the different
computational tasks are partitioned between Java and Python in pyspark
(so it's unclear to me how much freedom python should have to chew
through tons of memory).

A couple questions which this raises for me are:
-Are there any parameters I could tune differently to try and prevent
this crashing behavior?
-Do we know why this doesn't spill to disk (as Patrick Wendell
mentions that shuffle spill is for aggregations which occur during the
reduce phase)?
-Do we have any hunch about what computation is occurring when the crash occurs?

I'd definitely appreciate the insight of others, and am willing to run
experiments and send results/errors/logs out.  Also, I'm physically
located in Soda Hall (Berkeley) so if anyone near by is interested to
examine this first hand I am glad to meet up.

best,
-Brad


On Wed, Apr 9, 2014 at 4:21 AM, Andrew Ash and...@andrewash.com wrote:
 A JVM can easily be limited in how much memory it uses with the -Xmx
 parameter, but Python doesn't have memory limits built in in such a
 first-class way.  Maybe the memory limits aren't making it to the python
 executors.

 What was your SPARK_MEM setting?  The JVM below seems to be using 603201
 (pages?) and the 3 large python processes each are using ~180 (pages?).
 I'm unsure the units that the OOM killer's RSS column is in.  Could be
 either pages (4kb each) or bytes.


 Apr  8 11:19:19 bennett kernel: [86368.978326] [ 2348]  1002  234812573
 2102  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978329] [ 2349]  1002  234912573
 2101  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978332] [ 2350]  1002  235012573
 2101  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978336] [ 5115]  1002  511512571
 2101  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978339] [ 5116]  1002  511612571
 2101  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978341] [ 5117]  1002  511712571
 2101  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978344] [ 7725]  1002  772512570
 2098  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978347] [ 7726]  1002  772612570
 2098  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978350] [ 7727]  1002  772712570
 2098  220 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978353] [10324]  1002 1032412570
 2098  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978356] [10325]  1002 1032512570
 2098  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978359] [10326]  1002 1032612570
 2098  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978362] [12668]  1002 12668   603201
 47932 1900 0 java
 Apr  8 11:19:19 bennett kernel: [86368.978366] [13295]  1002 1329512570
 2100  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978368] [13296]  1002 1329612570
 2100  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978371] [13297]  1002 1329712570
 2100  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978375] [15192]  1002 1519212570
 2098  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978377] [15193]  1002 1519312570
 2098  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978379] [15195]  1002 1519512570
 2098  230 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978381] [15198]  1002 15198  1845471
 181846335730 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978383] [15200]  1002 15200  1710479
 168649233160 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978384] [15201]  1002 15201  1788470
 176234434630 0 python
 Apr  8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process
 15198 (python) score 221 or sacrifice child
 Apr  8 11:19:19 bennett kernel: [86368.978389] Killed process 15198 (python)
 total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB


 On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I poked around a bit more to (1) confirm my suspicions that the crash
 was related to memory consumption

pyspark broadcast error

2014-03-11 Thread Brad Miller
Hi All,

When I run the program shown below, I receive the error shown below.
I am running the current version of branch-0.9 from github.  Note that
I do not receive the error when I replace 2 ** 29 with 2 ** X,
where X  29.  More interestingly, I do not receive the error when X =
30, and when X  30 the code either crashes with Memory Error or
Py4JNetworkError: An error occurred while trying to connect to the
Java server.

I am aware that there are some bugs
(https://spark-project.atlassian.net/browse/SPARK-1065) related to
memory consumption with pyspark and broadcasting, but the behavior
with X = 29 seemed different and I was wondering if anybody had any
insight.

-Brad

*Program*
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '25g')
sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'FeatureExtraction')
meg_512 = range((2 ** 29) / 8)
tmp_broad = sc.broadcast(meg_512)

*Error*
---
Py4JError Traceback (most recent call last)
ipython-input-1-db8033dee301 in module()
  3 sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'FeatureExtraction')
  4 meg_1024 = range((2 ** 29) / 8)
 5 tmp_broad = sc.broadcast(meg_1024)

/home/spark/spark-branch-0.9/python/pyspark/context.py in broadcast(self, value)
277 pickleSer = PickleSerializer()
278 pickled = pickleSer.dumps(value)
-- 279 jbroadcast = self._jsc.broadcast(bytearray(pickled))
280 return Broadcast(jbroadcast.id(), value, jbroadcast,
281  self._pickled_broadcast_vars)

/home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
-- 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:

/home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
302 raise Py4JError(
303 'An error occurred while calling
{0}{1}{2}. Trace:\n{3}\n'.
-- 304 format(target_id, '.', name, value))
305 else:
306 raise Py4JError(

Py4JError: An error occurred while calling o7.broadcast. Trace:
java.lang.NegativeArraySizeException
at py4j.Base64.decode(Base64.java:292)
at py4j.Protocol.getBytes(Protocol.java:167)
at py4j.Protocol.getObject(Protocol.java:276)
at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
at py4j.commands.CallCommand.execute(CallCommand.java:77)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:701)