Repository: spark
Updated Branches:
  refs/heads/master 1347ebd4b -> b50ddfde0


http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/docs/scala-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 9941273..7737389 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -23,7 +23,7 @@ To write a Spark application, you need to add a dependency on 
Spark. If you use
 
     groupId = org.apache.spark
     artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
-    version = {{site.SPARK_VERSION}} 
+    version = {{site.SPARK_VERSION}}
 
 In addition, if you wish to access an HDFS cluster, you need to add a 
dependency on `hadoop-client` for your version of HDFS:
 
@@ -73,14 +73,14 @@ The master URL passed to Spark can be in one of the 
following formats:
 <table class="table">
 <tr><th>Master URL</th><th>Meaning</th></tr>
 <tr><td> local </td><td> Run Spark locally with one worker thread (i.e. no 
parallelism at all). </td></tr>
-<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, 
set this to the number of cores on your machine). 
+<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, 
set this to the number of cores on your machine).
 </td></tr>
-<tr><td> spark://HOST:PORT </td><td> Connect to the given <a 
href="spark-standalone.html">Spark standalone 
-        cluster</a> master. The port must be whichever one your master is 
configured to use, which is 7077 by default. 
+<tr><td> spark://HOST:PORT </td><td> Connect to the given <a 
href="spark-standalone.html">Spark standalone
+        cluster</a> master. The port must be whichever one your master is 
configured to use, which is 7077 by default.
 </td></tr>
-<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a 
href="running-on-mesos.html">Mesos</a> cluster. 
-        The host parameter is the hostname of the Mesos master. The port must 
be whichever one the master is configured to use, 
-        which is 5050 by default. 
+<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a 
href="running-on-mesos.html">Mesos</a> cluster.
+        The host parameter is the hostname of the Mesos master. The port must 
be whichever one the master is configured to use,
+        which is 5050 by default.
 </td></tr>
 </table>
 
@@ -265,11 +265,25 @@ A complete list of actions is available in the [RDD API 
doc](api/core/index.html
 
 ## RDD Persistence
 
-One of the most important capabilities in Spark is *persisting* (or *caching*) 
a dataset in memory across operations. When you persist an RDD, each node 
stores any slices of it that it computes in memory and reuses them in other 
actions on that dataset (or datasets derived from it). This allows future 
actions to be much faster (often by more than 10x). Caching is a key tool for 
building iterative algorithms with Spark and for interactive use from the 
interpreter.
-
-You can mark an RDD to be persisted using the `persist()` or `cache()` methods 
on it. The first time it is computed in an action, it will be kept in memory on 
the nodes. The cache is fault-tolerant -- if any partition of an RDD is lost, 
it will automatically be recomputed using the transformations that originally 
created it.
-
-In addition, each RDD can be stored using a different *storage level*, 
allowing you, for example, to persist the dataset on disk, or persist it in 
memory but as serialized Java objects (to save space), or even replicate it 
across nodes. These levels are chosen by passing a 
[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel)
 object to `persist()`. The `cache()` method is a shorthand for using the 
default storage level, which is `StorageLevel.MEMORY_ONLY` (store deserialized 
objects in memory). The complete set of available storage levels is:
+One of the most important capabilities in Spark is *persisting* (or *caching*) 
a dataset in memory
+across operations. When you persist an RDD, each node stores any slices of it 
that it computes in
+memory and reuses them in other actions on that dataset (or datasets derived 
from it). This allows
+future actions to be much faster (often by more than 10x). Caching is a key 
tool for building
+iterative algorithms with Spark and for interactive use from the interpreter.
+
+You can mark an RDD to be persisted using the `persist()` or `cache()` methods 
on it. The first time
+it is computed in an action, it will be kept in memory on the nodes. The cache 
is fault-tolerant --
+if any partition of an RDD is lost, it will automatically be recomputed using 
the transformations
+that originally created it.
+
+In addition, each RDD can be stored using a different *storage level*, 
allowing you, for example, to
+persist the dataset on disk, or persist it in memory but as serialized Java 
objects (to save space),
+or replicate it across nodes, or store the data in off-heap memory in 
[Tachyon](http://tachyon-project.org/).
+These levels are chosen by passing a
+[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel)
+object to `persist()`. The `cache()` method is a shorthand for using the 
default storage level,
+which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). 
The complete set of
+available storage levels is:
 
 <table class="table">
 <tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
@@ -292,8 +306,16 @@ In addition, each RDD can be stored using a different 
*storage level*, allowing
 </tr>
 <tr>
   <td> MEMORY_AND_DISK_SER </td>
-  <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in 
memory to disk instead of recomputing them
-    on the fly each time they're needed. </td>
+  <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in 
memory to disk instead of
+    recomputing them on the fly each time they're needed. </td>
+</tr>
+<tr>
+  <td> OFF_HEAP  </td>
+  <td> Store RDD in a <i>serialized</i> format in Tachyon.
+    This is generally more space-efficient than deserialized objects, 
especially when using a
+    <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read.
+    This also significantly reduces the overheads of GC.
+  </td>
 </tr>
 <tr>
   <td> DISK_ONLY </td>
@@ -307,30 +329,59 @@ In addition, each RDD can be stored using a different 
*storage level*, allowing
 
 ### Which Storage Level to Choose?
 
-Spark's storage levels are meant to provide different tradeoffs between memory 
usage and CPU efficiency.
-We recommend going through the following process to select one:
-
-* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), 
leave them that way. This is the most
-  CPU-efficient option, allowing operations on the RDDs to run as fast as 
possible.
-* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization 
library](tuning.html) to make the objects
-  much more space-efficient, but still reasonably fast to access.
-* Don't spill to disk unless the functions that computed your datasets are 
expensive, or they filter a large
-  amount of the data. Otherwise, recomputing a partition is about as fast as 
reading it from disk.
-* Use the replicated storage levels if you want fast fault recovery (e.g. if 
using Spark to serve requests from a web
-  application). *All* the storage levels provide full fault tolerance by 
recomputing lost data, but the replicated ones
-  let you continue running tasks on the RDD without waiting to recompute a 
lost partition.
- 
-If you want to define your own storage level (say, with replication factor of 
3 instead of 2), then use the function factor method `apply()` of the 
[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) 
singleton object.  
+Spark's storage levels are meant to provide different trade-offs between 
memory usage and CPU
+efficiency. It allows uses to choose memory, disk, or Tachyon for storing 
data. We recommend going
+through the following process to select one:
+
+* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), 
leave them that way.
+  This is the most CPU-efficient option, allowing operations on the RDDs to 
run as fast as possible.
+
+* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization 
library](tuning.html) to
+make the objects much more space-efficient, but still reasonably fast to 
access. You can also use
+`OFF_HEAP` mode to store the data off the heap in 
[Tachyon](http://tachyon-project.org/). This will
+significantly reduce JVM GC overhead.
+
+* Don't spill to disk unless the functions that computed your datasets are 
expensive, or they filter
+a large amount of the data. Otherwise, recomputing a partition is about as 
fast as reading it from
+disk.
+
+* Use the replicated storage levels if you want fast fault recovery (e.g. if 
using Spark to serve
+requests from a web application). *All* the storage levels provide full fault 
tolerance by
+recomputing lost data, but the replicated ones let you continue running tasks 
on the RDD without
+waiting to recompute a lost partition.
+
+If you want to define your own storage level (say, with replication factor of 
3 instead of 2), then
+use the function factor method `apply()` of the
+[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) 
singleton object.
+
+Spark has a block manager inside the Executors that let you chose memory, 
disk, or off-heap. The
+latter is for storing RDDs off-heap outside the Executor JVM on top of the 
memory management system
+[Tachyon](http://tachyon-project.org/). This mode has the following advantages:
+
+* Cached data will not be lost if individual executors crash.
+* Executors can have a smaller memory footprint, allowing you to run more 
executors on the same
+machine as the bulk of the memory will be inside Tachyon.
+* Reduced GC overhead since data is stored in Tachyon.
 
 # Shared Variables
 
-Normally, when a function passed to a Spark operation (such as `map` or 
`reduce`) is executed on a remote cluster node, it works on separate copies of 
all the variables used in the function. These variables are copied to each 
machine, and no updates to the variables on the remote machine are propagated 
back to the driver program. Supporting general, read-write shared variables 
across tasks would be inefficient. However, Spark does provide two limited 
types of *shared variables* for two common usage patterns: broadcast variables 
and accumulators.
+Normally, when a function passed to a Spark operation (such as `map` or 
`reduce`) is executed on a
+remote cluster node, it works on separate copies of all the variables used in 
the function. These
+variables are copied to each machine, and no updates to the variables on the 
remote machine are
+propagated back to the driver program. Supporting general, read-write shared 
variables across tasks
+would be inefficient. However, Spark does provide two limited types of *shared 
variables* for two
+common usage patterns: broadcast variables and accumulators.
 
 ## Broadcast Variables
 
-Broadcast variables allow the programmer to keep a read-only variable cached 
on each machine rather than shipping a copy of it with tasks. They can be used, 
for example, to give every node a copy of a large input dataset in an efficient 
manner. Spark also attempts to distribute broadcast variables using efficient 
broadcast algorithms to reduce communication cost.
+Broadcast variables allow the programmer to keep a read-only variable cached 
on each machine rather
+than shipping a copy of it with tasks. They can be used, for example, to give 
every node a copy of a
+large input dataset in an efficient manner. Spark also attempts to distribute 
broadcast variables
+using efficient broadcast algorithms to reduce communication cost.
 
-Broadcast variables are created from a variable `v` by calling 
`SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, 
and its value can be accessed by calling the `value` method. The interpreter 
session below shows this:
+Broadcast variables are created from a variable `v` by calling 
`SparkContext.broadcast(v)`. The
+broadcast variable is a wrapper around `v`, and its value can be accessed by 
calling the `value`
+method. The interpreter session below shows this:
 
 {% highlight scala %}
 scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
@@ -340,13 +391,21 @@ scala> broadcastVar.value
 res0: Array[Int] = Array(1, 2, 3)
 {% endhighlight %}
 
-After the broadcast variable is created, it should be used instead of the 
value `v` in any functions run on the cluster so that `v` is not shipped to the 
nodes more than once. In addition, the object `v` should not be modified after 
it is broadcast in order to ensure that all nodes get the same value of the 
broadcast variable (e.g. if the variable is shipped to a new node later).
+After the broadcast variable is created, it should be used instead of the 
value `v` in any functions
+run on the cluster so that `v` is not shipped to the nodes more than once. In 
addition, the object
+`v` should not be modified after it is broadcast in order to ensure that all 
nodes get the same
+value of the broadcast variable (e.g. if the variable is shipped to a new node 
later).
 
 ## Accumulators
 
-Accumulators are variables that are only "added" to through an associative 
operation and can therefore be efficiently supported in parallel. They can be 
used to implement counters (as in MapReduce) or sums. Spark natively supports 
accumulators of numeric value types and standard mutable collections, and 
programmers can add support for new types.
+Accumulators are variables that are only "added" to through an associative 
operation and can
+therefore be efficiently supported in parallel. They can be used to implement 
counters (as in
+MapReduce) or sums. Spark natively supports accumulators of numeric value 
types and standard mutable
+collections, and programmers can add support for new types.
 
-An accumulator is created from an initial value `v` by calling 
`SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it 
using the `+=` operator. However, they cannot read its value. Only the driver 
program can read the accumulator's value, using its `value` method.
+An accumulator is created from an initial value `v` by calling 
`SparkContext.accumulator(v)`. Tasks
+running on the cluster can then add to it using the `+=` operator. However, 
they cannot read its
+value. Only the driver program can read the accumulator's value, using its 
`value` method.
 
 The interpreter session below shows an accumulator being used to add up the 
elements of an array:
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
index e5a09ec..d3babc3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.examples
 
 import scala.math.random
+
 import org.apache.spark._
-import SparkContext._
 
 /** Computes an approximation to pi */
 object SparkPi {

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
new file mode 100644
index 0000000..53b303d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import java.util.Random
+import scala.math.exp
+import org.apache.spark.util.Vector
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.InputFormatInfo
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Logistic regression based classification.
+ * This example uses Tachyon to persist rdds during computation.
+ */
+object SparkTachyonHdfsLR {
+  val D = 10   // Numer of dimensions
+  val rand = new Random(42)
+
+  case class DataPoint(x: Vector, y: Double)
+
+  def parsePoint(line: String): DataPoint = {
+    val tok = new java.util.StringTokenizer(line, " ")
+    var y = tok.nextToken.toDouble
+    var x = new Array[Double](D)
+    var i = 0
+    while (i < D) {
+      x(i) = tok.nextToken.toDouble; i += 1
+    }
+    DataPoint(new Vector(x), y)
+  }
+
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>")
+      System.exit(1)
+    }
+    val inputPath = args(1)
+    val conf = SparkHadoopUtil.get.newConfiguration()
+    val sc = new SparkContext(args(0), "SparkTachyonHdfsLR",
+      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), 
Map(),
+      InputFormatInfo.computePreferredLocations(
+        Seq(new InputFormatInfo(conf, 
classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
+      ))
+    val lines = sc.textFile(inputPath)
+    val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP)
+    val ITERATIONS = args(2).toInt
+
+    // Initialize w to a random value
+    var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+    println("Initial w: " + w)
+
+    for (i <- 1 to ITERATIONS) {
+      println("On iteration " + i)
+      val gradient = points.map { p =>
+        (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
+      }.reduce(_ + _)
+      w -= gradient
+    }
+
+    println("Final w: " + w)
+    System.exit(0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
new file mode 100644
index 0000000..ce78f08
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import scala.math.random
+
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *  Computes an approximation to pi
+ *  This example uses Tachyon to persist rdds during computation.
+ */
+object SparkTachyonPi {
+  def main(args: Array[String]) {
+    if (args.length == 0) {
+      System.err.println("Usage: SparkTachyonPi <master> [<slices>]")
+      System.exit(1)
+    }
+    val spark = new SparkContext(args(0), "SparkTachyonPi",
+      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
+    
+    val slices = if (args.length > 1) args(1).toInt else 2
+    val n = 100000 * slices
+    
+    val rdd = spark.parallelize(1 to n, slices)
+    rdd.persist(StorageLevel.OFF_HEAP)
+    val count = rdd.map { i =>
+      val x = random * 2 - 1
+      val y = random * 2 - 1
+      if (x * x + y * y < 1) 1 else 0
+    }.reduce(_ + _)
+    println("Pi is roughly " + 4.0 * count / n)
+    
+    spark.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c5c697e..843a874 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -30,7 +30,7 @@ import scala.collection.JavaConversions._
 // import com.jsuereth.pgp.sbtplugin.PgpKeys._
 
 object SparkBuild extends Build {
-  val SPARK_VERSION = "1.0.0-SNAPSHOT" 
+  val SPARK_VERSION = "1.0.0-SNAPSHOT"
 
   // Hadoop version to build against. For example, "1.0.4" for Apache 
releases, or
   // "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can 
be set
@@ -185,15 +185,14 @@ object SparkBuild extends Build {
     concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
 
     resolvers ++= Seq(
-      // HTTPS is unavailable for Maven Central
       "Maven Repository"     at "http://repo.maven.apache.org/maven2";,
       "Apache Repository"    at 
"https://repository.apache.org/content/repositories/releases";,
       "JBoss Repository"     at 
"https://repository.jboss.org/nexus/content/repositories/releases/";,
       "MQTT Repository"      at 
"https://repo.eclipse.org/content/repositories/paho-releases/";,
-      "Cloudera Repository"  at 
"https://repository.cloudera.com/artifactory/cloudera-repos/";,
+      "Cloudera Repository"  at 
"http://repository.cloudera.com/artifactory/cloudera-repos/";,
       // For Sonatype publishing
-      //"sonatype-snapshots"   at 
"https://oss.sonatype.org/content/repositories/snapshots";,
-      //"sonatype-staging"     at 
"https://oss.sonatype.org/service/local/staging/deploy/maven2/";,
+      // "sonatype-snapshots"   at 
"https://oss.sonatype.org/content/repositories/snapshots";,
+      // "sonatype-staging"     at 
"https://oss.sonatype.org/service/local/staging/deploy/maven2/";,
       // also check the local Maven repository ~/.m2
       Resolver.mavenLocal
     ),
@@ -280,13 +279,18 @@ object SparkBuild extends Build {
   val slf4jVersion = "1.7.5"
 
   val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
+  val excludeEclipseJetty = ExclusionRule(organization = "org.eclipse.jetty")
   val excludeAsm = ExclusionRule(organization = "org.ow2.asm")
   val excludeOldAsm = ExclusionRule(organization = "asm")
   val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")
   val excludeSLF4J = ExclusionRule(organization = "org.slf4j")
   val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact 
= "scalap")
+  val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
+  val excludeCurator = ExclusionRule(organization = "org.apache.curator")
+  val excludePowermock = ExclusionRule(organization = "org.powermock")
 
-  def sparkPreviousArtifact(id: String, organization: String = 
"org.apache.spark", 
+
+  def sparkPreviousArtifact(id: String, organization: String = 
"org.apache.spark",
       version: String = "0.9.0-incubating", crossVersion: String = "2.10"): 
Option[sbt.ModuleID] = {
     val fullId = if (crossVersion.isEmpty) id else id + "_" + crossVersion
     Some(organization % fullId % version) // the artifact to compare binary 
compatibility with
@@ -323,6 +327,7 @@ object SparkBuild extends Build {
         "com.codahale.metrics"       % "metrics-graphite" % "3.0.0",
         "com.twitter"               %% "chill"            % "0.3.1" 
excludeAll(excludeAsm),
         "com.twitter"                % "chill-java"       % "0.3.1" 
excludeAll(excludeAsm),
+        "org.tachyonproject"         % "tachyon"          % "0.4.1-thrift" 
excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, 
excludePowermock),
         "com.clearspring.analytics"  % "stream"           % "2.5.1"
       ),
     libraryDependencies ++= maybeAvro

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index ff1023b..d8667e8 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -423,8 +423,11 @@ class SparkContext(object):
             raise Exception("storageLevel must be of type 
pyspark.StorageLevel")
 
         newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
-        return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
-            storageLevel.deserialized, storageLevel.replication)
+        return newStorageLevel(storageLevel.useDisk,
+                               storageLevel.useMemory,
+                               storageLevel.useOffHeap,
+                               storageLevel.deserialized,
+                               storageLevel.replication)
 
     def setJobGroup(self, groupId, description):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 9943296..fb27863 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1302,11 +1302,12 @@ class RDD(object):
         Get the RDD's current storage level.
         >>> rdd1 = sc.parallelize([1,2])
         >>> rdd1.getStorageLevel()
-        StorageLevel(False, False, False, 1)
+        StorageLevel(False, False, False, False, 1)
         """
         java_storage_level = self._jrdd.getStorageLevel()
         storage_level = StorageLevel(java_storage_level.useDisk(),
                                      java_storage_level.useMemory(),
+                                     java_storage_level.useOffHeap(),
                                      java_storage_level.deserialized(),
                                      java_storage_level.replication())
         return storage_level

http://git-wip-us.apache.org/repos/asf/spark/blob/b50ddfde/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index c3e3a44..7b6660e 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -25,23 +25,25 @@ class StorageLevel:
     Also contains static constants for some commonly used storage levels, such 
as MEMORY_ONLY.
     """
 
-    def __init__(self, useDisk, useMemory, deserialized, replication = 1):
+    def __init__(self, useDisk, useMemory, useOffHeap, deserialized, 
replication = 1):
         self.useDisk = useDisk
         self.useMemory = useMemory
+        self.useOffHeap = useOffHeap
         self.deserialized = deserialized
         self.replication = replication
 
     def __repr__(self):
-        return "StorageLevel(%s, %s, %s, %s)" % (
-            self.useDisk, self.useMemory, self.deserialized, self.replication)
+        return "StorageLevel(%s, %s, %s, %s, %s)" % (
+            self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, 
self.replication)
 
-StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
-StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
-StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
-StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2)
-StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False)
-StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2)
-StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True)
-StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2)
-StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False)
-StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2)
+StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
+StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
+StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True)
+StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2)
+StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False)
+StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
+StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True)
+StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
+StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
+StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
+StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1)
\ No newline at end of file

Reply via email to