Re: Loading RDDs in a streaming fashion
Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith
Re: Loading RDDs in a streaming fashion
Yep, that's definitely possible. It's one of the workarounds I was considering. I was just curious if there was a simpler (and perhaps more efficient) approach. Keith On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote: Could you modify your function so that it streams through the files record by record and outputs them to hdfs, then read them all in as RDDs and take the union? That would only use bounded memory. On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote: Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith
Re: Setting only master heap
, it is the amount of memory that a worker advertises as available for drivers to launch executors. The sum of the memory used by executors spawned from a worker cannot exceed SPARK_WORKER_MEMORY. Unfortunately, I'm not aware of a way to set the memory for Master and Worker individually, other than launching them manually. You can also try setting the config differently on each machine's spark-env.sh file. *++ If in YARN mode ++* In YARN, there is no setting for SPARK_DAEMON_MEMORY. Therefore this is only in the Standalone documentation. Remember that in YARN mode there is no Spark Worker, instead the YARN NodeManagers launches the Executors. And in YARN, there is no need to run a Spark Master JVM (since the YARN ResourceManager takes care of the scheduling). So, with YARN use SPARK_EXECUTOR_MEMORY to set the Executor's memory. And use SPARK_DRIVER_MEMORY to set the Driver's memory. Just an FYI - for compatibility's sake, even in YARN mode there is a setting for SPARK_WORKER_MEMORY, but this has been deprecated. If you do set it, it just does the same thing as setting SPARK_EXECUTOR_MEMORY would have done. - Sameer On Wed, Oct 22, 2014 at 1:46 PM, Keith Simmons ke...@pulse.io wrote: We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Setting only master heap
We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Re: Hung spark executors don't count toward worker memory limit
Maybe I should put this another way. If spark has two jobs, A and B, both of which consume the entire allocated memory pool, is it expected that spark can launch B before the executor processes tied to A are completely terminated? On Thu, Oct 9, 2014 at 6:57 PM, Keith Simmons ke...@pulse.io wrote: Actually, it looks like even when the job shuts down cleanly, there can be a few minutes of overlap between the time the next job launches and the first job actually terminates it's process. Here's some relevant lines from my log: 14/10/09 20:49:20 INFO Worker: Asked to kill executor app-20141009204127-0029/1 14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor app-20141009204127-0029/1 interrupted 14/10/09 20:49:20 INFO ExecutorRunner: Killing process! 14/10/09 20:49:20 INFO Worker: Asked to launch executor app-20141009204508-0030/1 for Job ... More lines about launching new job... 14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished with state KILLED As you can see, the first app didn't actually shutdown until two minutes after the new job launched. During that time, I was at double the worker memory limit. Keith On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons ke...@pulse.io wrote: Hi Folks, We have a spark job that is occasionally running out of memory and hanging (I believe in GC). This is it's own issue we're debugging, but in the meantime, there's another unfortunate side effect. When the job is killed (most often because of GC errors), each worker attempts to kill its respective executor. However, it appears that several of the executors fail to shut themselves down (I actually have to kill -9 them). However, even though the worker fails to successfully cleanup the executor, it starts the next job as though all the resources have been freed up. This is causing the spark worker to exceed it's configured memory limit, which is in turn running our boxes out of memory. Is there a setting I can configure to prevent this issue? Perhaps by having the worker force kill the executor or not start the next job until it's confirmed the executor has exited? Let me know if there's any additional information I can provide. Keith P.S. We're running spark 1.0.2
Hung spark executors don't count toward worker memory limit
Hi Folks, We have a spark job that is occasionally running out of memory and hanging (I believe in GC). This is it's own issue we're debugging, but in the meantime, there's another unfortunate side effect. When the job is killed (most often because of GC errors), each worker attempts to kill its respective executor. However, it appears that several of the executors fail to shut themselves down (I actually have to kill -9 them). However, even though the worker fails to successfully cleanup the executor, it starts the next job as though all the resources have been freed up. This is causing the spark worker to exceed it's configured memory limit, which is in turn running our boxes out of memory. Is there a setting I can configure to prevent this issue? Perhaps by having the worker force kill the executor or not start the next job until it's confirmed the executor has exited? Let me know if there's any additional information I can provide. Keith P.S. We're running spark 1.0.2
Re: Hung spark executors don't count toward worker memory limit
Actually, it looks like even when the job shuts down cleanly, there can be a few minutes of overlap between the time the next job launches and the first job actually terminates it's process. Here's some relevant lines from my log: 14/10/09 20:49:20 INFO Worker: Asked to kill executor app-20141009204127-0029/1 14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor app-20141009204127-0029/1 interrupted 14/10/09 20:49:20 INFO ExecutorRunner: Killing process! 14/10/09 20:49:20 INFO Worker: Asked to launch executor app-20141009204508-0030/1 for Job ... More lines about launching new job... 14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished with state KILLED As you can see, the first app didn't actually shutdown until two minutes after the new job launched. During that time, I was at double the worker memory limit. Keith On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons ke...@pulse.io wrote: Hi Folks, We have a spark job that is occasionally running out of memory and hanging (I believe in GC). This is it's own issue we're debugging, but in the meantime, there's another unfortunate side effect. When the job is killed (most often because of GC errors), each worker attempts to kill its respective executor. However, it appears that several of the executors fail to shut themselves down (I actually have to kill -9 them). However, even though the worker fails to successfully cleanup the executor, it starts the next job as though all the resources have been freed up. This is causing the spark worker to exceed it's configured memory limit, which is in turn running our boxes out of memory. Is there a setting I can configure to prevent this issue? Perhaps by having the worker force kill the executor or not start the next job until it's confirmed the executor has exited? Let me know if there's any additional information I can provide. Keith P.S. We're running spark 1.0.2
Error while running Spark SQL join when using Spark 1.0.1
HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994 https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994sa=Dsntz=1usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
Re: Error while running Spark SQL join when using Spark 1.0.1
To give a few more details of my environment in case that helps you reproduce: * I'm running spark 1.0.1 downloaded as a tar ball, not built myself * I'm running in stand alone mode, with 1 master and 1 worker, both on the same machine (though the same error occurs with two workers on two machines) * I'm using spark-core and spark-sql 1.0.1 pulled via maven Here's my built.sbt: name := spark-test version := 1.0 scalaVersion := 2.10.4 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/; libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 % provided libraryDependencies += org.apache.spark %% spark-core % 1.0.1 % provided On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com wrote: FWIW, I am unable to reproduce this using the example program locally. On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com wrote: Nope. All of them are registered from the driver program. However, I think we've found the culprit. If the join column between two tables is not in the same column position in both tables, it triggers what appears to be a bug. For example, this program fails: import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.types._ case class Record(value: String, key: Int) case class Record2(key: Int, value: String) object TestJob { def main(args: Array[String]) { run() } private def run() { val sparkConf = new SparkConf() sparkConf.setAppName(TestJob) sparkConf.set(spark.cores.max, 8) sparkConf.set(spark.storage.memoryFraction, 0.1) sparkConf.set(spark.shuffle.memoryFracton, 0.2) sparkConf.set(spark.executor.memory, 2g) sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar)) sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077) sparkConf.setSparkHome(/home/pulseio/spark/current) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i))) val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i))) rdd1.registerAsTable(rdd1) rdd2.registerAsTable(rdd2) sql(SELECT * FROM rdd1).collect.foreach { row = println(row) } sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row = println(row) } sc.stop() } } If you change the definition of Record and Record2 to the following, it succeeds: case class Record(key: Int, value: String) case class Record2(key: Int, value: String) as does: case class Record(value: String, key: Int) case class Record2(value: String, key: Int) Let me know if you need anymore details. On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust mich...@databricks.com wrote: Are you registering multiple RDDs of case classes as tables concurrently? You are possibly hitting SPARK-2178 which is caused by SI-6240. On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons keith.simm...@gmail.com wrote: HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
Re: Error while running Spark SQL join when using Spark 1.0.1
) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) On Tue, Jul 15, 2014 at 1:05 PM, Michael Armbrust mich...@databricks.com wrote: Can you print out the queryExecution? (i.e. println(sql().queryExecution)) On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons keith.simm...@gmail.com wrote: To give a few more details of my environment in case that helps you reproduce: * I'm running spark 1.0.1 downloaded as a tar ball, not built myself * I'm running in stand alone mode, with 1 master and 1 worker, both on the same machine (though the same error occurs with two workers on two machines) * I'm using spark-core and spark-sql 1.0.1 pulled via maven Here's my built.sbt: name := spark-test version := 1.0 scalaVersion := 2.10.4 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/; libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 % provided libraryDependencies += org.apache.spark %% spark-core % 1.0.1 % provided On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com wrote: FWIW, I am unable to reproduce this using the example program locally. On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com wrote: Nope. All of them are registered from the driver program. However, I think we've found the culprit. If the join column between two tables is not in the same column position in both tables, it triggers what appears to be a bug. For example, this program fails: import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.types._ case class Record(value: String, key: Int) case class Record2(key: Int, value: String) object TestJob { def main(args: Array[String]) { run() } private def run() { val sparkConf = new SparkConf() sparkConf.setAppName(TestJob) sparkConf.set(spark.cores.max, 8) sparkConf.set(spark.storage.memoryFraction, 0.1) sparkConf.set(spark.shuffle.memoryFracton, 0.2) sparkConf.set(spark.executor.memory, 2g) sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar)) sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077) sparkConf.setSparkHome(/home/pulseio/spark/current) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i))) val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i))) rdd1.registerAsTable(rdd1) rdd2.registerAsTable(rdd2) sql(SELECT * FROM rdd1).collect.foreach { row = println(row) } sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row = println(row) } sc.stop() } } If you change the definition of Record and Record2 to the following, it succeeds: case class Record(key: Int, value: String) case class Record2(key: Int, value: String) as does: case class Record(value: String, key: Int) case class Record2(value: String, key: Int) Let me know if you need anymore details. On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust mich...@databricks.com wrote: Are you registering multiple RDDs of case classes as tables concurrently? You are possibly hitting SPARK-2178 which is caused by SI-6240. On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons keith.simm...@gmail.com wrote: HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError
Re: Error while running Spark SQL join when using Spark 1.0.1
Cool. So Michael's hunch was correct, it is a thread issue. I'm currently using a tarball build, but I'll do a spark build with the patch as soon as I have a chance and test it out. Keith On Tue, Jul 15, 2014 at 4:14 PM, Zongheng Yang zonghen...@gmail.com wrote: Hi Keith gorenuru, This patch (https://github.com/apache/spark/pull/1423) solves the errors for me in my local tests. If possible, can you guys test this out to see if it solves your test programs? Thanks, Zongheng On Tue, Jul 15, 2014 at 3:08 PM, Zongheng Yang zonghen...@gmail.com wrote: - user@incubator Hi Keith, I did reproduce this using local-cluster[2,2,1024], and the errors look almost the same. Just wondering, despite the errors did your program output any result for the join? On my machine, I could see the correct output. Zongheng On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust mich...@databricks.com wrote: Thanks for the extra info. At a quick glance the query plan looks fine to me. The class IntegerType does build a type tag I wonder if you are seeing the Scala issue manifest in some new way. We will attempt to reproduce locally. On Tue, Jul 15, 2014 at 1:41 PM, gorenuru goren...@gmail.com wrote: Just my few cents on this. I having the same problems with v 1.0.1 but this bug is sporadic and looks like is relayed to object initialization. Even more, i'm not using any SQL or something. I just have utility class like this: object DataTypeDescriptor { type DataType = String val BOOLEAN = BOOLEAN val STRING = STRING val TIMESTAMP = TIMESTAMP val LONG = LONG val INT = INT val SHORT = SHORT val BYTE = BYTE val DECIMAL = DECIMAL val DOUBLE = DOUBLE val FLOAT = FLOAT def $$(name: String, format: Option[String] = None) = DataTypeDescriptor(name, format) private lazy val nativeTypes: Map[String, NativeType] = Map( BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP - TimestampType, LONG - LongType, INT - IntegerType, SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType, DOUBLE - DoubleType, FLOAT - FloatType ) lazy val defaultValues: Map[String, Any] = Map( BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT - 0, SHORT - 0.toShort, BYTE - 0.toByte, DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f ) def apply(dataType: String): DataTypeDescriptor = { DataTypeDescriptor(dataType.toUpperCase, None) } def apply(dataType: SparkDataType): DataTypeDescriptor = { nativeTypes .find { case (_, descriptor) = descriptor == dataType } .map { case (name, descriptor) = DataTypeDescriptor(name, None) } .get } . and some test that check each of this methods. The problem is that this test fails randomly with this error. P.S.: I did not have this problem in Spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Comparative study
Good point. Shows how personal use cases color how we interpret products. On Wed, Jul 9, 2014 at 1:08 AM, Sean Owen so...@cloudera.com wrote: On Wed, Jul 9, 2014 at 1:52 AM, Keith Simmons ke...@pulse.io wrote: Impala is *not* built on map/reduce, though it was built to replace Hive, which is map/reduce based. It has its own distributed query engine, though it does load data from HDFS, and is part of the hadoop ecosystem. Impala really shines when your (It was not built to replace Hive. It's purpose-built to make interactive use with a BI tool feasible -- single-digit second queries on huge data sets. It's very memory hungry. Hive's architecture choices and legacy code have been throughput-oriented, and can't really get below minutes at scale, but, remains a right choice when you are in fact doing ETL!)
Re: Comparative study
Santosh, To add a bit more to what Nabeel said, Spark and Impala are very different tools. Impala is *not* built on map/reduce, though it was built to replace Hive, which is map/reduce based. It has its own distributed query engine, though it does load data from HDFS, and is part of the hadoop ecosystem. Impala really shines when your entire dataset fits into memory and your processing can be expressed in terms of sql. Paired with the column oriented Parquet format, it can really scream with the right dataset. Spark also has a SQL layer (formely shark, now more tightly integrated with Spark), but at least for our dataset, Impala was faster. However, Spark has a fantastic and far more flexible programming model. As has been mentioned a few times in this thread, it has a better batch processing model than map/reduce, it can do stream processing, and in the newest release, it looks like it can even mix and match sql queries. You do need to be more aware of memory issues than map/reduce, since using more memory is one of the primary sources of Sparks speed, but with that caveat, its a great technology. In our particular workflow, we're replacing map/reduce with spark for our batch layer and using Impala for our query layer. Daniel, For what it's worth, we've had a bunch of hanging issues because the garbage collector seems to get out of control. The most effective technique has been to dramatically increase the numPartition argument in our various groupBy and cogroup calls which reduces the per-task memory requirements. We also reduced the memory used by the shuffler ( spark.shuffle.memoryFraction) and turned off RDD memory (since we don't have any iterative algorithms). Finally, using kryo delivered a hug performance and memory boost (even without registering any custom serializers). Keith On Tue, Jul 8, 2014 at 2:58 PM, Robert James srobertja...@gmail.com wrote: As a new user, I can definitely say that my experience with Spark has been rather raw. The appeal of interactive, batch, and in between all using more or less straight Scala is unarguable. But the experience of deploying Spark has been quite painful, mainly about gaps between compile time and run time to the JVM, due to dependency conflicts, having to use uber jars, Spark's own uber jar which includes some very old libs, etc. What's more, there's very little resources available to help. Some times I've been able to get help via public sources, but, more often than not, it's been trial and error. Enough that, despite Spark's unmistakable appeal, we are seriously considering dropping it entirely and just doing a classical Hadoop. On 7/8/14, Surendranauth Hiraman suren.hira...@velos.io wrote: Aaron, I don't think anyone was saying Spark can't handle this data size, given testimony from the Spark team, Bizo, etc., on large datasets. This has kept us trying different things to get our flow to work over the course of several weeks. Agreed that the first instinct should be what did I do wrong. I believe that is what every person facing this issue has done, in reaching out to the user group repeatedly over the course of the few of months that I've been active here. I also know other companies (all experienced with large production datasets on other platforms) facing the same types of issues - flows that run on subsets of data but not the whole production set. So I think, as you are saying, it points to the need for further diagnostics. And maybe also some type of guidance on typical issues with different types of datasets (wide rows, narrow rows, etc.), flow topologies. etc.? Hard to tell where we are going wrong right now. We've tried many things over the course of 6 weeks or so. I tried to look for the professional services link on databricks.com but didn't find it. ;-) (jk). -Suren On Tue, Jul 8, 2014 at 4:16 PM, Aaron Davidson ilike...@gmail.com wrote: Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. +1 @Reynold Spark can handle big big data. There are known issues with informing the user about what went wrong and how to fix it that we're actively working on, but the first impulse when a job fails should be what did I do wrong rather than Spark can't handle this workload. Messaging is a huge part in making this clear -- getting things like a job hanging or an out of memory error can be very difficult to debug, and improving this is one of our highest priorties. On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com wrote: Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. I've personally run a workload that shuffled 300 TB of
Re: Spark Memory Bounds
Thanks! Sounds like my rough understanding was roughly right :) Definitely understand cached RDDs can add to the memory requirements. Luckily, like you mentioned, you can configure spark to flush that to disk and bound its total size in memory via spark.storage.memoryFraction, so I have a pretty good handle on the overall RDD contribution. Thanks for all the help. Keith On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen c...@adatao.com wrote: Keith, please see inline. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 7:22 PM, Keith Simmons ke...@pulse.io wrote: A dash of both. I want to know enough that I can reason about, rather than strictly control, the amount of memory Spark will use. If I have a big data set, I want to understand how I can design it so that Spark's memory consumption falls below my available resources. Or alternatively, if it's even possible for Spark to process a data set over a certain size. And if I run into memory problems, I want to know which knobs to turn, and how turning those knobs will affect memory consumption. In practice, to avoid OOME, a key dial we use is the size (or inversely, number) of the partitions of your dataset. Clearly there is some blow-up factor F such that, e.g., if you start out with 128MB on-disk data partitions, you would consume 128F MB of memory, both by Spark and by your closure code. Knowing this, you would want to size the partitions such that AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker 128F. To arrive at F, you could do some back-of-the-envelope modeling, and/or run the job and observe empirically. It's my understanding that between certain key stages in a Spark DAG (i.e. group by stages), Spark will serialize all data structures necessary to continue the computation at the next stage, including closures. So in theory, per machine, Spark only needs to hold the transient memory required to process the partitions assigned to the currently active tasks. Is my understanding correct? Specifically, once a key/value pair is serialized in the shuffle stage of a task, are the references to the raw java objects released before the next task is started. Yes, that is correct in non-cached mode. At the same time, Spark also does something else optionally, which is to keep the data structures (RDDs) persistent in memory (*). As such it is possible partitions that are not being actively worked on to be consuming memory. Spark will spill all these to local disk if they take up more memory than it is allowed to take. So the key thing to worry about is less about what Spark does (apart of overhead and yes, the possibility of bugs that need to be fixed), and more about what your closure code does with JVM memory as a whole. If in doubt, refer back to the blow-up factor model described above. (*) this is a fundamentally differentiating feature of Spark over a range of other in-memory architectures, that focus on raw-data or transient caches that serve non-equivalent purposes when viewed from the application level. It allows for very fast access to ready-to-consume high-level data structures, as long as available RAM permits. On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.comwrote: Keith, do you mean bound as in (a) strictly control to some quantifiable limit, or (b) try to minimize the amount used by each task? If a, then that is outside the scope of Spark's memory management, which you should think of as an application-level (that is, above JVM) mechanism. In this scope, Spark voluntarily tracks and limits the amount of memory it uses for explicitly known data structures, such as RDDs. What Spark cannot do is, e.g., control or manage the amount of JVM memory that a given piece of user code might take up. For example, I might write some closure code that allocates a large array of doubles unbeknownst to Spark. If b, then your thinking is in the right direction, although quite imperfect, because of things like the example above. We often experience OOME if we're not careful with job partitioning. What I think Spark needs to evolve to is at least to include a mechanism for application-level hints about task memory requirements. We might work on this and submit a PR for it. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote: I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm
Spark Memory Bounds
I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm wondering how to bound C. It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions. That leads me to believe that the amount of transient memory is roughly follows: total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine Does this sound right? In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit. Keith
Re: Spark Memory Bounds
A dash of both. I want to know enough that I can reason about, rather than strictly control, the amount of memory Spark will use. If I have a big data set, I want to understand how I can design it so that Spark's memory consumption falls below my available resources. Or alternatively, if it's even possible for Spark to process a data set over a certain size. And if I run into memory problems, I want to know which knobs to turn, and how turning those knobs will affect memory consumption. It's my understanding that between certain key stages in a Spark DAG (i.e. group by stages), Spark will serialize all data structures necessary to continue the computation at the next stage, including closures. So in theory, per machine, Spark only needs to hold the transient memory required to process the partitions assigned to the currently active tasks. Is my understanding correct? Specifically, once a key/value pair is serialized in the shuffle stage of a task, are the references to the raw java objects released before the next task is started. On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.com wrote: Keith, do you mean bound as in (a) strictly control to some quantifiable limit, or (b) try to minimize the amount used by each task? If a, then that is outside the scope of Spark's memory management, which you should think of as an application-level (that is, above JVM) mechanism. In this scope, Spark voluntarily tracks and limits the amount of memory it uses for explicitly known data structures, such as RDDs. What Spark cannot do is, e.g., control or manage the amount of JVM memory that a given piece of user code might take up. For example, I might write some closure code that allocates a large array of doubles unbeknownst to Spark. If b, then your thinking is in the right direction, although quite imperfect, because of things like the example above. We often experience OOME if we're not careful with job partitioning. What I think Spark needs to evolve to is at least to include a mechanism for application-level hints about task memory requirements. We might work on this and submit a PR for it. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote: I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm wondering how to bound C. It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions. That leads me to believe that the amount of transient memory is roughly follows: total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine Does this sound right? In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit. Keith