Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Actually, I'm working with a binary format.  The api allows reading out a
single record at a time, but I'm not sure how to get those records into
spark (without reading everything into memory from a single file at once).



On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but if
 your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD, and
 in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith





Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Yep, that's definitely possible.  It's one of the workarounds I was
considering.  I was just curious if there was a simpler (and perhaps more
efficient) approach.

Keith

On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote:

 Could you modify your function so that it streams through the files record
 by record and outputs them to hdfs, then read them all in as RDDs and take
 the union? That would only use bounded memory.

 On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote:

 Actually, I'm working with a binary format.  The api allows reading out a
 single record at a time, but I'm not sure how to get those records into
 spark (without reading everything into memory from a single file at once).



 On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but
 if your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD,
 and in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith







Re: Setting only master heap

2014-10-26 Thread Keith Simmons
, it is the
 amount of memory that a worker advertises as available for drivers to
 launch executors. The sum of the memory used by executors spawned from a
 worker cannot exceed SPARK_WORKER_MEMORY.

 Unfortunately, I'm not aware of a way to set the memory for Master and
 Worker individually, other than launching them manually. You can also try
 setting the config differently on each machine's spark-env.sh file.


 *++ If in YARN mode ++*
 In YARN, there is no setting for SPARK_DAEMON_MEMORY. Therefore this is
 only in the Standalone documentation.

 Remember that in YARN mode there is no Spark Worker, instead the YARN
 NodeManagers launches the Executors. And in YARN, there is no need to run a
 Spark Master JVM (since the YARN ResourceManager takes care of the
 scheduling).

 So, with YARN use SPARK_EXECUTOR_MEMORY to set the Executor's memory. And
 use SPARK_DRIVER_MEMORY to set the Driver's memory.

 Just an FYI - for compatibility's sake, even in YARN mode there is a
 setting for SPARK_WORKER_MEMORY, but this has been deprecated. If you do
 set it, it just does the same thing as setting SPARK_EXECUTOR_MEMORY would
 have done.


 - Sameer


 On Wed, Oct 22, 2014 at 1:46 PM, Keith Simmons ke...@pulse.io wrote:

 We've been getting some OOMs from the spark master since upgrading to
 Spark 1.1.0.  I've found SPARK_DAEMON_MEMORY, but that also seems to
 increase the worker heap, which as far as I know is fine.  Is there any
 setting which *only* increases the master heap size?

 Keith







Setting only master heap

2014-10-22 Thread Keith Simmons
We've been getting some OOMs from the spark master since upgrading to Spark
1.1.0.  I've found SPARK_DAEMON_MEMORY, but that also seems to increase the
worker heap, which as far as I know is fine.  Is there any setting which
*only* increases the master heap size?

Keith


Re: Hung spark executors don't count toward worker memory limit

2014-10-13 Thread Keith Simmons
Maybe I should put this another way.  If spark has two jobs, A and B, both
of which consume the entire allocated memory pool, is it expected that
spark can launch B before the executor processes tied to A are completely
terminated?

On Thu, Oct 9, 2014 at 6:57 PM, Keith Simmons ke...@pulse.io wrote:

 Actually, it looks like even when the job shuts down cleanly, there can be
 a few minutes of overlap between the time the next job launches and the
 first job actually terminates it's process.  Here's some relevant lines
 from my log:

 14/10/09 20:49:20 INFO Worker: Asked to kill executor
 app-20141009204127-0029/1
 14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor
 app-20141009204127-0029/1 interrupted
 14/10/09 20:49:20 INFO ExecutorRunner: Killing process!
 14/10/09 20:49:20 INFO Worker: Asked to launch executor
 app-20141009204508-0030/1 for Job
 ... More lines about launching new job...
 14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished
 with state KILLED

 As you can see, the first app didn't actually shutdown until two minutes
 after the new job launched.  During that time, I was at double the worker
 memory limit.

 Keith


 On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons ke...@pulse.io wrote:

 Hi Folks,

 We have a spark job that is occasionally running out of memory and
 hanging (I believe in GC).  This is it's own issue we're debugging, but in
 the meantime, there's another unfortunate side effect.  When the job is
 killed (most often because of GC errors), each worker attempts to kill its
 respective executor.  However, it appears that several of the executors
 fail to shut themselves down (I actually have to kill -9 them).  However,
 even though the worker fails to successfully cleanup the executor, it
 starts the next job as though all the resources have been freed up.  This
 is causing the spark worker to exceed it's configured memory limit, which
 is in turn running our boxes out of memory.  Is there a setting I can
 configure to prevent this issue?  Perhaps by having the worker force kill
 the executor or not start the next job until it's confirmed the executor
 has exited?  Let me know if there's any additional information I can
 provide.

 Keith

 P.S. We're running spark 1.0.2





Hung spark executors don't count toward worker memory limit

2014-10-09 Thread Keith Simmons
Hi Folks,

We have a spark job that is occasionally running out of memory and hanging
(I believe in GC).  This is it's own issue we're debugging, but in the
meantime, there's another unfortunate side effect.  When the job is killed
(most often because of GC errors), each worker attempts to kill its
respective executor.  However, it appears that several of the executors
fail to shut themselves down (I actually have to kill -9 them).  However,
even though the worker fails to successfully cleanup the executor, it
starts the next job as though all the resources have been freed up.  This
is causing the spark worker to exceed it's configured memory limit, which
is in turn running our boxes out of memory.  Is there a setting I can
configure to prevent this issue?  Perhaps by having the worker force kill
the executor or not start the next job until it's confirmed the executor
has exited?  Let me know if there's any additional information I can
provide.

Keith

P.S. We're running spark 1.0.2


Re: Hung spark executors don't count toward worker memory limit

2014-10-09 Thread Keith Simmons
Actually, it looks like even when the job shuts down cleanly, there can be
a few minutes of overlap between the time the next job launches and the
first job actually terminates it's process.  Here's some relevant lines
from my log:

14/10/09 20:49:20 INFO Worker: Asked to kill executor
app-20141009204127-0029/1
14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor
app-20141009204127-0029/1 interrupted
14/10/09 20:49:20 INFO ExecutorRunner: Killing process!
14/10/09 20:49:20 INFO Worker: Asked to launch executor
app-20141009204508-0030/1 for Job
... More lines about launching new job...
14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished
with state KILLED

As you can see, the first app didn't actually shutdown until two minutes
after the new job launched.  During that time, I was at double the worker
memory limit.

Keith


On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons ke...@pulse.io wrote:

 Hi Folks,

 We have a spark job that is occasionally running out of memory and hanging
 (I believe in GC).  This is it's own issue we're debugging, but in the
 meantime, there's another unfortunate side effect.  When the job is killed
 (most often because of GC errors), each worker attempts to kill its
 respective executor.  However, it appears that several of the executors
 fail to shut themselves down (I actually have to kill -9 them).  However,
 even though the worker fails to successfully cleanup the executor, it
 starts the next job as though all the resources have been freed up.  This
 is causing the spark worker to exceed it's configured memory limit, which
 is in turn running our boxes out of memory.  Is there a setting I can
 configure to prevent this issue?  Perhaps by having the worker force kill
 the executor or not start the next job until it's confirmed the executor
 has exited?  Let me know if there's any additional information I can
 provide.

 Keith

 P.S. We're running spark 1.0.2



Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
HI folks,

I'm running into the following error when trying to perform a join in my
code:

java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.catalyst.types.LongType$

I see similar errors for StringType$ and also:

 scala.reflect.runtime.ReflectError: value apache is not a package.

Strangely, if I just work with a single table, everything is fine. I can
iterate through the records in both tables and print them out without a
problem.

Furthermore, this code worked without an exception in Spark 1.0.0 (thought
the join caused some field corruption, possibly related to
https://issues.apache.org/jira/browse/SPARK-1994
https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994sa=Dsntz=1usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q).
 The data is coming from a custom protocol buffer based format on hdfs that
is being mapped into the individual record types without a problem.

The immediate cause seems to be a task trying to deserialize one or more
SQL case classes before loading the spark uber jar, but I have no idea why
this is happening, or why it only happens when I do a join.  Ideas?

Keith

P.S. If it's relevant, we're using the Kryo serializer.


Re: Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
To give a few more details of my environment in case that helps you
reproduce:

* I'm running spark 1.0.1 downloaded as a tar ball, not built myself
* I'm running in stand alone mode, with 1 master and 1 worker, both on the
same machine (though the same error occurs with two workers on two machines)
* I'm using spark-core and spark-sql 1.0.1 pulled via maven

Here's my built.sbt:

name := spark-test

version := 1.0

scalaVersion := 2.10.4

resolvers += Akka Repository at http://repo.akka.io/releases/;

resolvers += Cloudera Repository at 
https://repository.cloudera.com/artifactory/cloudera-repos/;

libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 %
provided

libraryDependencies += org.apache.spark %% spark-core % 1.0.1 %
provided


On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com
wrote:

 FWIW, I am unable to reproduce this using the example program locally.

 On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com
 wrote:
  Nope.  All of them are registered from the driver program.
 
  However, I think we've found the culprit.  If the join column between two
  tables is not in the same column position in both tables, it triggers
 what
  appears to be a bug.  For example, this program fails:
 
  import org.apache.spark.SparkContext._
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.sql.SchemaRDD
  import org.apache.spark.sql.catalyst.types._
 
  case class Record(value: String, key: Int)
  case class Record2(key: Int, value: String)
 
  object TestJob {
 
def main(args: Array[String]) {
  run()
}
 
private def run() {
  val sparkConf = new SparkConf()
  sparkConf.setAppName(TestJob)
  sparkConf.set(spark.cores.max, 8)
  sparkConf.set(spark.storage.memoryFraction, 0.1)
  sparkConf.set(spark.shuffle.memoryFracton, 0.2)
  sparkConf.set(spark.executor.memory, 2g)
 
 sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar))
  sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077)
  sparkConf.setSparkHome(/home/pulseio/spark/current)
  val sc = new SparkContext(sparkConf)
 
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  import sqlContext._
 
  val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i)))
  val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i)))
  rdd1.registerAsTable(rdd1)
  rdd2.registerAsTable(rdd2)
 
  sql(SELECT * FROM rdd1).collect.foreach { row = println(row) }
 
  sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
  rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row =
  println(row) }
 
  sc.stop()
}
 
  }
 
  If you change the definition of Record and Record2 to the following, it
  succeeds:
 
  case class Record(key: Int, value: String)
  case class Record2(key: Int, value: String)
 
  as does:
 
  case class Record(value: String, key: Int)
  case class Record2(value: String, key: Int)
 
  Let me know if you need anymore details.
 
 
  On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust 
 mich...@databricks.com
  wrote:
 
  Are you registering multiple RDDs of case classes as tables
 concurrently?
  You are possibly hitting SPARK-2178 which is caused by SI-6240.
 
 
  On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons 
 keith.simm...@gmail.com
  wrote:
 
  HI folks,
 
  I'm running into the following error when trying to perform a join in
 my
  code:
 
  java.lang.NoClassDefFoundError: Could not initialize class
  org.apache.spark.sql.catalyst.types.LongType$
 
  I see similar errors for StringType$ and also:
 
   scala.reflect.runtime.ReflectError: value apache is not a package.
 
  Strangely, if I just work with a single table, everything is fine. I
 can
  iterate through the records in both tables and print them out without a
  problem.
 
  Furthermore, this code worked without an exception in Spark 1.0.0
  (thought the join caused some field corruption, possibly related to
  https://issues.apache.org/jira/browse/SPARK-1994).  The data is
 coming from
  a custom protocol buffer based format on hdfs that is being mapped
 into the
  individual record types without a problem.
 
  The immediate cause seems to be a task trying to deserialize one or
 more
  SQL case classes before loading the spark uber jar, but I have no idea
 why
  this is happening, or why it only happens when I do a join.  Ideas?
 
  Keith
 
  P.S. If it's relevant, we're using the Kryo serializer.
 
 
 
 



Re: Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)


On Tue, Jul 15, 2014 at 1:05 PM, Michael Armbrust mich...@databricks.com
wrote:

 Can you print out the queryExecution?

 (i.e. println(sql().queryExecution))


 On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons keith.simm...@gmail.com
 wrote:

 To give a few more details of my environment in case that helps you
 reproduce:

 * I'm running spark 1.0.1 downloaded as a tar ball, not built myself
 * I'm running in stand alone mode, with 1 master and 1 worker, both on
 the same machine (though the same error occurs with two workers on two
 machines)
 * I'm using spark-core and spark-sql 1.0.1 pulled via maven

 Here's my built.sbt:

 name := spark-test

 version := 1.0

 scalaVersion := 2.10.4

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Cloudera Repository at 
 https://repository.cloudera.com/artifactory/cloudera-repos/;

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 %
 provided

 libraryDependencies += org.apache.spark %% spark-core % 1.0.1 %
 provided


 On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com
 wrote:

 FWIW, I am unable to reproduce this using the example program locally.

 On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com
 wrote:
  Nope.  All of them are registered from the driver program.
 
  However, I think we've found the culprit.  If the join column between
 two
  tables is not in the same column position in both tables, it triggers
 what
  appears to be a bug.  For example, this program fails:
 
  import org.apache.spark.SparkContext._
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.sql.SchemaRDD
  import org.apache.spark.sql.catalyst.types._
 
  case class Record(value: String, key: Int)
  case class Record2(key: Int, value: String)
 
  object TestJob {
 
def main(args: Array[String]) {
  run()
}
 
private def run() {
  val sparkConf = new SparkConf()
  sparkConf.setAppName(TestJob)
  sparkConf.set(spark.cores.max, 8)
  sparkConf.set(spark.storage.memoryFraction, 0.1)
  sparkConf.set(spark.shuffle.memoryFracton, 0.2)
  sparkConf.set(spark.executor.memory, 2g)
 
 sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar))
  sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077)
  sparkConf.setSparkHome(/home/pulseio/spark/current)
  val sc = new SparkContext(sparkConf)
 
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  import sqlContext._
 
  val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i,
 i)))
  val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i,
 srdd_$i)))
  rdd1.registerAsTable(rdd1)
  rdd2.registerAsTable(rdd2)
 
  sql(SELECT * FROM rdd1).collect.foreach { row = println(row) }
 
  sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
  rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row =
  println(row) }
 
  sc.stop()
}
 
  }
 
  If you change the definition of Record and Record2 to the following, it
  succeeds:
 
  case class Record(key: Int, value: String)
  case class Record2(key: Int, value: String)
 
  as does:
 
  case class Record(value: String, key: Int)
  case class Record2(value: String, key: Int)
 
  Let me know if you need anymore details.
 
 
  On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust 
 mich...@databricks.com
  wrote:
 
  Are you registering multiple RDDs of case classes as tables
 concurrently?
  You are possibly hitting SPARK-2178 which is caused by SI-6240.
 
 
  On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons 
 keith.simm...@gmail.com
  wrote:
 
  HI folks,
 
  I'm running into the following error when trying to perform a join
 in my
  code:
 
  java.lang.NoClassDefFoundError: Could not initialize class
  org.apache.spark.sql.catalyst.types.LongType$
 
  I see similar errors for StringType$ and also:
 
   scala.reflect.runtime.ReflectError

Re: Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
Cool.  So Michael's hunch was correct, it is a thread issue.  I'm currently
using a tarball build, but I'll do a spark build with the patch as soon as
I have a chance and test it out.

Keith


On Tue, Jul 15, 2014 at 4:14 PM, Zongheng Yang zonghen...@gmail.com wrote:

 Hi Keith  gorenuru,

 This patch (https://github.com/apache/spark/pull/1423) solves the
 errors for me in my local tests. If possible, can you guys test this
 out to see if it solves your test programs?

 Thanks,
 Zongheng

 On Tue, Jul 15, 2014 at 3:08 PM, Zongheng Yang zonghen...@gmail.com
 wrote:
  - user@incubator
 
  Hi Keith,
 
  I did reproduce this using local-cluster[2,2,1024], and the errors
  look almost the same.  Just wondering, despite the errors did your
  program output any result for the join? On my machine, I could see the
  correct output.
 
  Zongheng
 
  On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust
  mich...@databricks.com wrote:
  Thanks for the extra info.  At a quick glance the query plan looks fine
 to
  me.  The class IntegerType does build a type tag I wonder if you are
  seeing the Scala issue manifest in some new way.  We will attempt to
  reproduce locally.
 
 
  On Tue, Jul 15, 2014 at 1:41 PM, gorenuru goren...@gmail.com wrote:
 
  Just my few cents on this.
 
  I having the same problems with v 1.0.1 but this bug is sporadic and
 looks
  like is relayed to object initialization.
 
  Even more, i'm not using any SQL or something. I just have utility
 class
  like this:
 
  object DataTypeDescriptor {
type DataType = String
 
val BOOLEAN = BOOLEAN
val STRING = STRING
val TIMESTAMP = TIMESTAMP
val LONG = LONG
val INT = INT
val SHORT = SHORT
val BYTE = BYTE
val DECIMAL = DECIMAL
val DOUBLE = DOUBLE
val FLOAT = FLOAT
 
def $$(name: String, format: Option[String] = None) =
  DataTypeDescriptor(name, format)
 
private lazy val nativeTypes: Map[String, NativeType] = Map(
  BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP -
  TimestampType, LONG - LongType, INT - IntegerType,
  SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType,
 DOUBLE
  -
  DoubleType, FLOAT - FloatType
)
 
lazy val defaultValues: Map[String, Any] = Map(
  BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT
 -
  0,
  SHORT - 0.toShort, BYTE - 0.toByte,
  DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f
)
 
def apply(dataType: String): DataTypeDescriptor = {
  DataTypeDescriptor(dataType.toUpperCase, None)
}
 
def apply(dataType: SparkDataType): DataTypeDescriptor = {
  nativeTypes
.find { case (_, descriptor) = descriptor == dataType }
.map { case (name, descriptor) = DataTypeDescriptor(name, None)
 }
.get
}
 
  .
 
  and some test that check each of this methods.
 
  The problem is that this test fails randomly with this error.
 
  P.S.: I did not have this problem in Spark 1.0.0
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 



Re: Comparative study

2014-07-09 Thread Keith Simmons
Good point.  Shows how personal use cases color how we interpret products.


On Wed, Jul 9, 2014 at 1:08 AM, Sean Owen so...@cloudera.com wrote:

 On Wed, Jul 9, 2014 at 1:52 AM, Keith Simmons ke...@pulse.io wrote:

  Impala is *not* built on map/reduce, though it was built to replace
 Hive, which is map/reduce based.  It has its own distributed query engine,
 though it does load data from HDFS, and is part of the hadoop ecosystem.
  Impala really shines when your


 (It was not built to replace Hive. It's purpose-built to make interactive
 use with a BI tool feasible -- single-digit second queries on huge data
 sets. It's very memory hungry. Hive's architecture choices and legacy code
 have been throughput-oriented, and can't really get below minutes at scale,
 but, remains a right choice when you are in fact doing ETL!)



Re: Comparative study

2014-07-08 Thread Keith Simmons
Santosh,

To add a bit more to what Nabeel said, Spark and Impala are very different
tools.  Impala is *not* built on map/reduce, though it was built to replace
Hive, which is map/reduce based.  It has its own distributed query engine,
though it does load data from HDFS, and is part of the hadoop ecosystem.
 Impala really shines when your entire dataset fits into memory and your
processing can be expressed in terms of sql.  Paired with the column
oriented Parquet format, it can really scream with the right dataset.

Spark also has a SQL layer (formely shark, now more tightly integrated with
Spark), but at least for our dataset, Impala was faster.  However, Spark
has a fantastic and far more flexible programming model.  As has been
mentioned a few times in this thread, it has a better batch processing
model than map/reduce, it can do stream processing, and in the newest
release, it looks like it can even mix and match sql queries.  You do need
to be more aware of memory issues than map/reduce, since using more memory
is one of the primary sources of Sparks speed, but with that caveat, its a
great technology.  In our particular workflow, we're replacing map/reduce
with spark for our batch layer and using Impala for our query layer.

Daniel,

For what it's worth, we've had a bunch of hanging issues because the
garbage collector seems to get out of control.  The most effective
technique has been to dramatically increase the numPartition argument in
our various groupBy and cogroup calls which reduces the per-task memory
requirements.  We also reduced the memory used by the shuffler (
spark.shuffle.memoryFraction) and turned off RDD memory (since we don't
have any iterative algorithms).  Finally, using kryo delivered a hug
performance and memory boost (even without registering any custom
serializers).

Keith




On Tue, Jul 8, 2014 at 2:58 PM, Robert James srobertja...@gmail.com wrote:

 As a new user, I can definitely say that my experience with Spark has
 been rather raw.  The appeal of interactive, batch, and in between all
 using more or less straight Scala is unarguable.  But the experience
 of deploying Spark has been quite painful, mainly about gaps between
 compile time and run time to the JVM, due to dependency conflicts,
 having to use uber jars, Spark's own uber jar which includes some very
 old libs, etc.

 What's more, there's very little resources available to help.  Some
 times I've been able to get help via public sources, but, more often
 than not, it's been trial and error.  Enough that, despite Spark's
 unmistakable appeal, we are seriously considering dropping it entirely
 and just doing a classical Hadoop.

 On 7/8/14, Surendranauth Hiraman suren.hira...@velos.io wrote:
  Aaron,
 
  I don't think anyone was saying Spark can't handle this data size, given
  testimony from the Spark team, Bizo, etc., on large datasets. This has
 kept
  us trying different things to get our flow to work over the course of
  several weeks.
 
  Agreed that the first instinct should be what did I do wrong.
 
  I believe that is what every person facing this issue has done, in
 reaching
  out to the user group repeatedly over the course of the few of months
 that
  I've been active here. I also know other companies (all experienced with
  large production datasets on other platforms) facing the same types of
  issues - flows that run on subsets of data but not the whole production
  set.
 
  So I think, as you are saying, it points to the need for further
  diagnostics. And maybe also some type of guidance on typical issues with
  different types of datasets (wide rows, narrow rows, etc.), flow
  topologies. etc.? Hard to tell where we are going wrong right now. We've
  tried many things over the course of 6 weeks or so.
 
  I tried to look for the professional services link on databricks.com but
  didn't find it. ;-) (jk).
 
  -Suren
 
 
 
  On Tue, Jul 8, 2014 at 4:16 PM, Aaron Davidson ilike...@gmail.com
 wrote:
 
  Not sure exactly what is happening but perhaps there are ways to
  restructure your program for it to work better. Spark is definitely
 able
  to
  handle much, much larger workloads.
 
 
  +1 @Reynold
 
  Spark can handle big big data. There are known issues with informing
  the
  user about what went wrong and how to fix it that we're actively working
  on, but the first impulse when a job fails should be what did I do
  wrong
  rather than Spark can't handle this workload. Messaging is a huge part
  in
  making this clear -- getting things like a job hanging or an out of
  memory
  error can be very difficult to debug, and improving this is one of our
  highest priorties.
 
 
  On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com
 wrote:
 
  Not sure exactly what is happening but perhaps there are ways to
  restructure your program for it to work better. Spark is definitely
 able
  to
  handle much, much larger workloads.
 
  I've personally run a workload that shuffled 300 TB of 

Re: Spark Memory Bounds

2014-05-28 Thread Keith Simmons
Thanks!  Sounds like my rough understanding was roughly right :)

Definitely understand cached RDDs can add to the memory requirements.
 Luckily, like you mentioned, you can configure spark to flush that to disk
and bound its total size in memory via spark.storage.memoryFraction, so I
have a pretty good handle on the overall RDD contribution.

Thanks for all the help.

Keith


On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen c...@adatao.com wrote:

 Keith, please see inline.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 7:22 PM, Keith Simmons ke...@pulse.io wrote:

 A dash of both.  I want to know enough that I can reason about, rather
 than strictly control, the amount of memory Spark will use.  If I have a
 big data set, I want to understand how I can design it so that Spark's
 memory consumption falls below my available resources.  Or alternatively,
 if it's even possible for Spark to process a data set over a certain size.
  And if I run into memory problems, I want to know which knobs to turn, and
 how turning those knobs will affect memory consumption.


 In practice, to avoid OOME, a key dial we use is the size (or inversely,
 number) of the partitions of your dataset. Clearly there is some blow-up
 factor F such that, e.g., if you start out with 128MB on-disk data
 partitions, you would consume 128F MB of memory, both by Spark and by your
 closure code. Knowing this, you would want to size the partitions such that
 AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker  128F. To arrive at
 F, you could do some back-of-the-envelope modeling, and/or run the job and
 observe empirically.



 It's my understanding that between certain key stages in a Spark DAG
 (i.e. group by stages), Spark will serialize all data structures necessary
 to continue the computation at the next stage, including closures.  So in
 theory, per machine, Spark only needs to hold the transient memory required
 to process the partitions assigned to the currently active tasks.  Is my
 understanding correct?  Specifically, once a key/value pair is serialized
 in the shuffle stage of a task, are the references to the raw java objects
 released before the next task is started.


 Yes, that is correct in non-cached mode. At the same time, Spark also does
 something else optionally, which is to keep the data structures (RDDs)
 persistent in memory (*). As such it is possible partitions that are not
 being actively worked on to be consuming memory. Spark will spill all these
 to local disk if they take up more memory than it is allowed to take. So
 the key thing to worry about is less about what Spark does (apart of
 overhead and yes, the possibility of bugs that need to be fixed), and more
 about what your closure code does with JVM memory as a whole. If in doubt,
 refer back to the blow-up factor model described above.

 (*) this is a fundamentally differentiating feature of Spark over a range
 of other in-memory architectures, that focus on raw-data or transient
 caches that serve non-equivalent purposes when viewed from the application
 level. It allows for very fast access to ready-to-consume high-level data
 structures, as long as available RAM permits.




 On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.comwrote:

 Keith, do you mean bound as in (a) strictly control to some
 quantifiable limit, or (b) try to minimize the amount used by each task?

 If a, then that is outside the scope of Spark's memory management,
 which you should think of as an application-level (that is, above JVM)
 mechanism. In this scope, Spark voluntarily tracks and limits the amount
 of memory it uses for explicitly known data structures, such as RDDs. What
 Spark cannot do is, e.g., control or manage the amount of JVM memory that a
 given piece of user code might take up. For example, I might write some
 closure code that allocates a large array of doubles unbeknownst to Spark.

 If b, then your thinking is in the right direction, although quite
 imperfect, because of things like the example above. We often experience
 OOME if we're not careful with job partitioning. What I think Spark needs
 to evolve to is at least to include a mechanism for application-level hints
 about task memory requirements. We might work on this and submit a PR for
 it.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working
 with more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient
 memory used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm

Spark Memory Bounds

2014-05-27 Thread Keith Simmons
I'm trying to determine how to bound my memory use in a job working with
more data than can simultaneously fit in RAM.  From reading the tuning
guide, my impression is that Spark's memory usage is roughly the following:

(A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
used by all currently running tasks

I can bound A with spark.storage.memoryFraction and I can bound B with
spark.shuffle.memoryFraction.
 I'm wondering how to bound C.

It's been hinted at a few times on this mailing list that you can reduce
memory use by increasing the number of partitions.  That leads me to
believe that the amount of transient memory is roughly follows:

total_data_set_size/number_of_partitions *
number_of_tasks_simultaneously_running_per_machine

Does this sound right?  In other words, as I increase the number of
partitions, the size of each partition will decrease, and since each task
is processing a single partition and there are a bounded number of tasks in
flight, my memory use has a rough upper limit.

Keith


Re: Spark Memory Bounds

2014-05-27 Thread Keith Simmons
A dash of both.  I want to know enough that I can reason about, rather
than strictly control, the amount of memory Spark will use.  If I have a
big data set, I want to understand how I can design it so that Spark's
memory consumption falls below my available resources.  Or alternatively,
if it's even possible for Spark to process a data set over a certain size.
 And if I run into memory problems, I want to know which knobs to turn, and
how turning those knobs will affect memory consumption.

It's my understanding that between certain key stages in a Spark DAG (i.e.
group by stages), Spark will serialize all data structures necessary to
continue the computation at the next stage, including closures.  So in
theory, per machine, Spark only needs to hold the transient memory required
to process the partitions assigned to the currently active tasks.  Is my
understanding correct?  Specifically, once a key/value pair is serialized
in the shuffle stage of a task, are the references to the raw java objects
released before the next task is started.



On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.com wrote:

 Keith, do you mean bound as in (a) strictly control to some quantifiable
 limit, or (b) try to minimize the amount used by each task?

 If a, then that is outside the scope of Spark's memory management, which
 you should think of as an application-level (that is, above JVM) mechanism.
 In this scope, Spark voluntarily tracks and limits the amount of memory
 it uses for explicitly known data structures, such as RDDs. What Spark
 cannot do is, e.g., control or manage the amount of JVM memory that a given
 piece of user code might take up. For example, I might write some closure
 code that allocates a large array of doubles unbeknownst to Spark.

 If b, then your thinking is in the right direction, although quite
 imperfect, because of things like the example above. We often experience
 OOME if we're not careful with job partitioning. What I think Spark needs
 to evolve to is at least to include a mechanism for application-level hints
 about task memory requirements. We might work on this and submit a PR for
 it.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working with
 more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
 used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm wondering how to bound C.

 It's been hinted at a few times on this mailing list that you can reduce
 memory use by increasing the number of partitions.  That leads me to
 believe that the amount of transient memory is roughly follows:

 total_data_set_size/number_of_partitions *
 number_of_tasks_simultaneously_running_per_machine

 Does this sound right?  In other words, as I increase the number of
 partitions, the size of each partition will decrease, and since each task
 is processing a single partition and there are a bounded number of tasks in
 flight, my memory use has a rough upper limit.

 Keith