Re: Spark 2.0: Task never completes

2016-08-03 Thread Utkarsh Sengar
Not sure what caused it but the partition size was 3 million there. The RDD
was created from mongo hadoop 1.5.1
Earlier (mongo hadoop 1.3 and spark 1.5) it worked just fine, not sure what
changed.

A a fix, I applied a repartition(40) (where 40 varies by my processing
logic) before the cartesian and it fixed the problem.

On Wed, Aug 3, 2016 at 10:04 AM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> After an upgrade from 1.5.1 to 2.0, one of the tasks never completes and
> keeps spilling data to disk overtime.
> long count = resultRdd.count();
> LOG.info("TOTAL in resultRdd: " + count);
>
> resultRdd is has a rather complex structure:
>
> JavaPairRDD<Long, Tuple3<LocalDateTime, Integer, SimResult>>
> resultRdd = myRdd
> .cartesian(runsRdd)
> .cartesian(datesToRunRdd)
> .coalesce(datesToRun.size() * runs.size() *
> ridsToRun.size())
> .mapToPair(t -> { return result});
>
> "mapToPair" to pair does a bunch of processing over the cartesian product
> and constructs "result".
>
> This works fine in spark 1.6.1 and the logic inside "mapToPair" is nicely
> unit tested.
>
> This is the threaddump, any suggestions on the possible issue?
>
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
>
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
>
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:153)
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:153)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
>
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1682)
> org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1115)
> org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1115)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> --
> Thanks,
> -Utkarsh
>



-- 
Thanks,
-Utkarsh


Re: Spark 2.0 error: Wrong FS: file://spark-warehouse, expected: file:///

2016-08-02 Thread Utkarsh Sengar
I don't think its a related problem, although by setting
"spark.sql.warehouse.dir"=/tmp in spark config fixed it.

On Tue, Aug 2, 2016 at 5:02 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> Do we have a workaround for this problem?
> Can I overwrite that using some config?
>
> On Tue, Aug 2, 2016 at 4:48 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> This is https://issues.apache.org/jira/browse/SPARK-15899  -- anyone
>> seeing this please review the proposed change. I think it's stalled
>> and needs an update.
>>
>> On Tue, Aug 2, 2016 at 4:47 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>> wrote:
>> > Upgraded to spark2.0 and tried to load a model:
>> > LogisticRegressionModel model = LogisticRegressionModel.load(sc.sc(),
>> > "s3a://cruncher/c/models/lr/");
>> >
>> > Getting this error: Exception in thread "main"
>> > java.lang.IllegalArgumentException: Wrong FS: file://spark-warehouse,
>> > expected: file:///
>> > Full stacktrace:
>> >
>> https://gist.githubusercontent.com/utkarsh2012/7c4c8e0f408e36a8fb6d9c9d3bd6b301/raw/2621ed3ceffb63d72ecdce169193dfabe4d41b40/spark2.0%2520LR%2520load
>> >
>> >
>> > This was working fine in Spark 1.5.1. I don't have "spark-warehouse"
>> > anywhere in my code, so its somehow defaulting to that.
>> >
>> > --
>> > Thanks,
>> > -Utkarsh
>>
>
>
>
> --
> Thanks,
> -Utkarsh
>



-- 
Thanks,
-Utkarsh


Re: Spark 2.0 error: Wrong FS: file://spark-warehouse, expected: file:///

2016-08-02 Thread Utkarsh Sengar
Do we have a workaround for this problem?
Can I overwrite that using some config?

On Tue, Aug 2, 2016 at 4:48 PM, Sean Owen <so...@cloudera.com> wrote:

> This is https://issues.apache.org/jira/browse/SPARK-15899  -- anyone
> seeing this please review the proposed change. I think it's stalled
> and needs an update.
>
> On Tue, Aug 2, 2016 at 4:47 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
> wrote:
> > Upgraded to spark2.0 and tried to load a model:
> > LogisticRegressionModel model = LogisticRegressionModel.load(sc.sc(),
> > "s3a://cruncher/c/models/lr/");
> >
> > Getting this error: Exception in thread "main"
> > java.lang.IllegalArgumentException: Wrong FS: file://spark-warehouse,
> > expected: file:///
> > Full stacktrace:
> >
> https://gist.githubusercontent.com/utkarsh2012/7c4c8e0f408e36a8fb6d9c9d3bd6b301/raw/2621ed3ceffb63d72ecdce169193dfabe4d41b40/spark2.0%2520LR%2520load
> >
> >
> > This was working fine in Spark 1.5.1. I don't have "spark-warehouse"
> > anywhere in my code, so its somehow defaulting to that.
> >
> > --
> > Thanks,
> > -Utkarsh
>



-- 
Thanks,
-Utkarsh


Spark 2.0 error: Wrong FS: file://spark-warehouse, expected: file:///

2016-08-02 Thread Utkarsh Sengar
Upgraded to spark2.0 and tried to load a model:
LogisticRegressionModel model = LogisticRegressionModel.load(sc.sc(),
"s3a://cruncher/c/models/lr/");

Getting this error: Exception in thread "main"
java.lang.IllegalArgumentException: Wrong FS: file://spark-warehouse,
expected: file:///
Full stacktrace:
https://gist.githubusercontent.com/utkarsh2012/7c4c8e0f408e36a8fb6d9c9d3bd6b301/raw/2621ed3ceffb63d72ecdce169193dfabe4d41b40/spark2.0%2520LR%2520load


This was working fine in Spark 1.5.1. I don't have "spark-warehouse"
anywhere in my code, so its somehow defaulting to that.

-- 
Thanks,
-Utkarsh


javax.net.ssl.SSLHandshakeException: unable to find valid certification path to requested target

2016-06-20 Thread Utkarsh Sengar
We are intermittently getting this error when spark tried to load data from
S3:Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target.

https://gist.githubusercontent.com/utkarsh2012/1c4cd2dc82c20c6f389b783927371bd7/raw/a1be6617d23b1744631427fe90aaa1cce4313f36/stacktrace

Running spark 1.5.1 on java8 and mesos-0.23.0
jets3t is 0.9.4

What can be the possible issue here - network, mesos or s3? It was working
fine earlier.

-- 
Thanks,
-Utkarsh


Re: How to deal with tasks running too long?

2016-06-16 Thread Utkarsh Sengar
Thanks All, I know i have a data skew but the data is unpredictable and
hard to find every time.
Do you think this workaround is reasonable?

ExecutorService executor =
Executors.newCachedThreadPool();
Callable< Result > task = () -> simulation.run();
Future future = executor.submit(task);
try {
simResult = future.get(20, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
SPARKLOG.info("Task timed out");
}

It will force timeout the task if it runs for more than 20mins.


On Thu, Jun 16, 2016 at 5:00 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> I'd check Details for Stage page in web UI.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jun 16, 2016 at 6:45 AM, Utkarsh Sengar <utkarsh2...@gmail.com>
> wrote:
> > This SO question was asked about 1yr ago.
> >
> http://stackoverflow.com/questions/31799755/how-to-deal-with-tasks-running-too-long-comparing-to-others-in-job-in-yarn-cli
> >
> > I answered this question with a suggestion to try speculation but it
> doesn't
> > quite do what the OP expects. I have been running into this issue more
> these
> > days. Out of 5000 tasks, 4950 completes in 5mins but the last 50 never
> > really completes, have tried waiting for 4hrs. This can be a memory
> issue or
> > maybe the way spark's fine grained mode works with mesos, I am trying to
> > enable jmxsink to get a heap dump.
> >
> > But in the mean time, is there a better fix for this? (in any version of
> > spark, I am using 1.5.1 but can upgrade). It would be great if the last
> 50
> > tasks in my example can be killed (timed out) and the stage completes
> > successfully.
> >
> > --
> > Thanks,
> > -Utkarsh
>



-- 
Thanks,
-Utkarsh


How to deal with tasks running too long?

2016-06-15 Thread Utkarsh Sengar
This SO question was asked about 1yr ago.
http://stackoverflow.com/questions/31799755/how-to-deal-with-tasks-running-too-long-comparing-to-others-in-job-in-yarn-cli

I answered this question with a suggestion to try speculation but it
doesn't quite do what the OP expects. I have been running into this issue
more these days. Out of 5000 tasks, 4950 completes in 5mins but the last 50
never really completes, have tried waiting for 4hrs. This can be a memory
issue or maybe the way spark's fine grained mode works with mesos, I am
trying to enable jmxsink to get a heap dump.

But in the mean time, is there a better fix for this? (in any version of
spark, I am using 1.5.1 but can upgrade). It would be great if the last 50
tasks in my example can be killed (timed out) and the stage completes
successfully.

-- 
Thanks,
-Utkarsh


Re: LogisticRegressionModel not able to load serialized model from S3

2016-02-11 Thread Utkarsh Sengar
The problem turned out to be corrupt parquet data, the error was a bit
misleading by spark though.

On Mon, Feb 8, 2016 at 3:41 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> I am storing a model in s3 in this path:
> "bucket_name/p1/models/lr/20160204_0410PM/ser" and the structure of the
> saved dir looks like this:
>
> 1. bucket_name/p1/models/lr/20160204_0410PM/ser/data -> _SUCCESS,
> _metadata, _common_metadata
> and part-r-0-ebd3dc3c-1f2c-45a3-8793-c8f0cb8e7d01.gz.parquet
> 2. bucket_name/p1/models/lr/20160204_0410PM/ser/metadata/ -> _SUCCESS
> and part-0
>
> So when I try to load "bucket_name/p1/models/lr/20160204_0410PM/ser"
> for LogisticRegressionModel:
>
> LogisticRegressionModel model = LogisticRegressionModel.load(sc.sc(),
> "s3n://bucket_name/p1/models/lr/20160204_0410PM/ser");
>
> I get this error consistently. I have permission to the bucket and I am
> able to other data using textFiles()
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent
> failure: Lost task 0.3 in stage 2.0 (TID 5, mesos-slave12):
> java.lang.NullPointerException
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:433)
> at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381)
>   at
> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155)
> at
> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
> at
> org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:153)
> at
> org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
> at
> org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Any pointers of whats wrong?
>
> --
> -Utkarsh
>



-- 
Thanks,
-Utkarsh


LogisticRegressionModel not able to load serialized model from S3

2016-02-08 Thread Utkarsh Sengar
I am storing a model in s3 in this path:
"bucket_name/p1/models/lr/20160204_0410PM/ser" and the structure of the
saved dir looks like this:

1. bucket_name/p1/models/lr/20160204_0410PM/ser/data -> _SUCCESS,
_metadata, _common_metadata
and part-r-0-ebd3dc3c-1f2c-45a3-8793-c8f0cb8e7d01.gz.parquet
2. bucket_name/p1/models/lr/20160204_0410PM/ser/metadata/ -> _SUCCESS
and part-0

So when I try to load "bucket_name/p1/models/lr/20160204_0410PM/ser"
for LogisticRegressionModel:

LogisticRegressionModel model = LogisticRegressionModel.load(sc.sc(),
"s3n://bucket_name/p1/models/lr/20160204_0410PM/ser");

I get this error consistently. I have permission to the bucket and I am
able to other data using textFiles()

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 2.0 (TID 5, mesos-slave12):
java.lang.NullPointerException
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:433)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381)
  at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155)
at
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:153)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Any pointers of whats wrong?

-- 
-Utkarsh


Using accumulator to push custom logs to driver

2016-02-01 Thread Utkarsh Sengar
I am trying to debug code executed in executors by logging. Even when I add
log4j's LOG.info(..) inside .map() I don't see it in mesos task logs in the
corresponding slaves.
Its anyway inefficient to keep checking multiple slaves for logs.

One way to deal with this is to push logs to a central location.

Another way (for debugging purposes) is to use accumulators . Is it
advisable to use accumulators to push string from executors to driver?
It will simplify things when I am debugging datasets, bugs which is hard to
reproduce locally etc.

Suggestions/comments?

-Utkarsh


RegressionModelEvaluator (from jpmml) NotSerializableException when instantiated in the driver

2015-12-09 Thread Utkarsh Sengar
I am trying to load a PMML file in a spark job. Instantiate it only once
and pass it to the executors. But I get a NotSerializableException for
org.xml.sax.helpers.LocatorImpl which is used inside jpmml.

I have this class Prediction.java:
public class Prediction implements Serializable {
private RegressionModelEvaluator rme;

public Prediction() throws Exception {
InputStream is = .getResourceAsStream("model.pmml");
Source source = ImportFilter.apply(new InputSource(is));
PMML model = JAXBUtil.unmarshalPMML(source);
rme = new RegressionModelEvaluator(model);
is.close();
}

public Map predict(params) {
   ..
return rme.evaluate(params);
}
}


Now I want to instantiate it only once since the
"JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine
I instantiate inside the map{}

So I do this in my driver:

Prediction prediction = new Prediction();
JavaRDD result = rdd1
.cartesian(rdd2)
.map(t -> {...)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

I am doing this right?

-- 
Thanks,
-Utkarsh


Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-09 Thread Utkarsh Sengar
Hi Tim,

Any way I can provide more info on this?

On Thu, Oct 1, 2015 at 4:21 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> Not sure what you mean by that, I shared the data which I see in spark UI.
> Can you point me to a location where I can precisely get the data you need?
>
> When I run the job in fine grained mode, I see tons are tasks created and
> destroyed under a mesos "framework". I have about 80k spark tasks which I
> think translates directly to independent mesos tasks.
>
> https://dl.dropboxusercontent.com/u/2432670/Screen%20Shot%202015-10-01%20at%204.14.34%20PM.png
>
> When i run the job in coarse grained mode, I just see 1-4 tasks with 1-4
> executors (it varies from what mesos allocates). And these mesos tasks try
> to complete the 80k spark tasks and runs out of memory eventually (see the
> stack track above) in the gist shared above.
>
>
> On Thu, Oct 1, 2015 at 4:07 PM, Tim Chen <t...@mesosphere.io> wrote:
>
>> Hi Utkarsh,
>>
>> I replied earlier asking what is your task assignment like with fine vs
>> coarse grain mode look like?
>>
>> Tim
>>
>> On Thu, Oct 1, 2015 at 4:05 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>> wrote:
>>
>>> Bumping it up, its not really a blocking issue.
>>> But fine grain mode eats up uncertain number of resources in mesos and
>>> launches tons of tasks, so I would prefer using the coarse grained mode if
>>> only it didn't run out of memory.
>>>
>>> Thanks,
>>> -Utkarsh
>>>
>>> On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tim,
>>>>
>>>> 1. spark.mesos.coarse:false (fine grain mode)
>>>> This is the data dump for config and executors assigned:
>>>> https://gist.github.com/utkarsh2012/6401d5526feccab14687
>>>>
>>>> 2. spark.mesos.coarse:true (coarse grain mode)
>>>> Dump for coarse mode:
>>>> https://gist.github.com/utkarsh2012/918cf6f8ed5945627188
>>>>
>>>> As you can see, exactly the same code works fine in fine grained, goes
>>>> out of memory in coarse grained mode. First an executor was lost and then
>>>> the driver went out of memory.
>>>> So I am trying to understand what is different in fine grained vs
>>>> coarse mode other than allocation of multiple mesos tasks vs 1 mesos task.
>>>> Clearly spark is not managing memory in the same way.
>>>>
>>>> Thanks,
>>>> -Utkarsh
>>>>
>>>>
>>>> On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen <t...@mesosphere.io> wrote:
>>>>
>>>>> Hi Utkarsh,
>>>>>
>>>>> What is your job placement like when you run fine grain mode? You said
>>>>> coarse grain mode only ran with one node right?
>>>>>
>>>>> And when the job is running could you open the Spark webui and get
>>>>> stats about the heap size and other java settings?
>>>>>
>>>>> Tim
>>>>>
>>>>> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar <
>>>>> utkarsh2...@gmail.com> wrote:
>>>>>
>>>>>> Bumping this one up, any suggestions on the stacktrace?
>>>>>> spark.mesos.coarse=true is not working and the driver crashed with
>>>>>> the error.
>>>>>>
>>>>>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar <
>>>>>> utkarsh2...@gmail.com> wrote:
>>>>>>
>>>>>>> Missed to do a reply-all.
>>>>>>>
>>>>>>> Tim,
>>>>>>>
>>>>>>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse =
>>>>>>> false works (sorry there was a typo in my last email, I meant "when I do
>>>>>>> "spark.mesos.coarse=false", the job works like a charm. ").
>>>>>>>
>>>>>>> I get this exception with spark.mesos.coarse = true:
>>>>>>>
>>>>>>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>>>>>>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>>>>>>> "55af5a61e8a42806f47546c1"}
>>>>>>>
>>>>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa

Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-01 Thread Utkarsh Sengar
Not sure what you mean by that, I shared the data which I see in spark UI.
Can you point me to a location where I can precisely get the data you need?

When I run the job in fine grained mode, I see tons are tasks created and
destroyed under a mesos "framework". I have about 80k spark tasks which I
think translates directly to independent mesos tasks.
https://dl.dropboxusercontent.com/u/2432670/Screen%20Shot%202015-10-01%20at%204.14.34%20PM.png

When i run the job in coarse grained mode, I just see 1-4 tasks with 1-4
executors (it varies from what mesos allocates). And these mesos tasks try
to complete the 80k spark tasks and runs out of memory eventually (see the
stack track above) in the gist shared above.


On Thu, Oct 1, 2015 at 4:07 PM, Tim Chen <t...@mesosphere.io> wrote:

> Hi Utkarsh,
>
> I replied earlier asking what is your task assignment like with fine vs
> coarse grain mode look like?
>
> Tim
>
> On Thu, Oct 1, 2015 at 4:05 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
> wrote:
>
>> Bumping it up, its not really a blocking issue.
>> But fine grain mode eats up uncertain number of resources in mesos and
>> launches tons of tasks, so I would prefer using the coarse grained mode if
>> only it didn't run out of memory.
>>
>> Thanks,
>> -Utkarsh
>>
>> On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>> wrote:
>>
>>> Hi Tim,
>>>
>>> 1. spark.mesos.coarse:false (fine grain mode)
>>> This is the data dump for config and executors assigned:
>>> https://gist.github.com/utkarsh2012/6401d5526feccab14687
>>>
>>> 2. spark.mesos.coarse:true (coarse grain mode)
>>> Dump for coarse mode:
>>> https://gist.github.com/utkarsh2012/918cf6f8ed5945627188
>>>
>>> As you can see, exactly the same code works fine in fine grained, goes
>>> out of memory in coarse grained mode. First an executor was lost and then
>>> the driver went out of memory.
>>> So I am trying to understand what is different in fine grained vs coarse
>>> mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
>>> spark is not managing memory in the same way.
>>>
>>> Thanks,
>>> -Utkarsh
>>>
>>>
>>> On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen <t...@mesosphere.io> wrote:
>>>
>>>> Hi Utkarsh,
>>>>
>>>> What is your job placement like when you run fine grain mode? You said
>>>> coarse grain mode only ran with one node right?
>>>>
>>>> And when the job is running could you open the Spark webui and get
>>>> stats about the heap size and other java settings?
>>>>
>>>> Tim
>>>>
>>>> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar <utkarsh2...@gmail.com
>>>> > wrote:
>>>>
>>>>> Bumping this one up, any suggestions on the stacktrace?
>>>>> spark.mesos.coarse=true is not working and the driver crashed with the
>>>>> error.
>>>>>
>>>>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar <utkarsh2...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Missed to do a reply-all.
>>>>>>
>>>>>> Tim,
>>>>>>
>>>>>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
>>>>>> works (sorry there was a typo in my last email, I meant "when I do
>>>>>> "spark.mesos.coarse=false", the job works like a charm. ").
>>>>>>
>>>>>> I get this exception with spark.mesos.coarse = true:
>>>>>>
>>>>>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>>>>>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>>>>>> "55af5a61e8a42806f47546c1"}
>>>>>>
>>>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611337>15/09/22
>>>>>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>>>>>> "55af5a61e8a42806f47546c1"}, max= null
>>>>>>
>>>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611453>Exception
>>>>>> in thread &q

Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-01 Thread Utkarsh Sengar
Bumping it up, its not really a blocking issue.
But fine grain mode eats up uncertain number of resources in mesos and
launches tons of tasks, so I would prefer using the coarse grained mode if
only it didn't run out of memory.

Thanks,
-Utkarsh

On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> Hi Tim,
>
> 1. spark.mesos.coarse:false (fine grain mode)
> This is the data dump for config and executors assigned:
> https://gist.github.com/utkarsh2012/6401d5526feccab14687
>
> 2. spark.mesos.coarse:true (coarse grain mode)
> Dump for coarse mode:
> https://gist.github.com/utkarsh2012/918cf6f8ed5945627188
>
> As you can see, exactly the same code works fine in fine grained, goes out
> of memory in coarse grained mode. First an executor was lost and then the
> driver went out of memory.
> So I am trying to understand what is different in fine grained vs coarse
> mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
> spark is not managing memory in the same way.
>
> Thanks,
> -Utkarsh
>
>
> On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen <t...@mesosphere.io> wrote:
>
>> Hi Utkarsh,
>>
>> What is your job placement like when you run fine grain mode? You said
>> coarse grain mode only ran with one node right?
>>
>> And when the job is running could you open the Spark webui and get stats
>> about the heap size and other java settings?
>>
>> Tim
>>
>> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>> wrote:
>>
>>> Bumping this one up, any suggestions on the stacktrace?
>>> spark.mesos.coarse=true is not working and the driver crashed with the
>>> error.
>>>
>>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>>> wrote:
>>>
>>>> Missed to do a reply-all.
>>>>
>>>> Tim,
>>>>
>>>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
>>>> works (sorry there was a typo in my last email, I meant "when I do
>>>> "spark.mesos.coarse=false", the job works like a charm. ").
>>>>
>>>> I get this exception with spark.mesos.coarse = true:
>>>>
>>>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>>>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>>>> "55af5a61e8a42806f47546c1"}
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611337>15/09/22
>>>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>>>> "55af5a61e8a42806f47546c1"}, max= null
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611453>Exception
>>>> in thread "main" java.lang.OutOfMemoryError: Java heap space
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611524>
>>>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611599>
>>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611671>
>>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611743>
>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611788>
>>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>>>
>>>> <http://singularity-qa-uswest2.otenv.com/task

Re: spark.mesos.coarse impacts memory performance on mesos

2015-09-28 Thread Utkarsh Sengar
Hi Tim,

1. spark.mesos.coarse:false (fine grain mode)
This is the data dump for config and executors assigned:
https://gist.github.com/utkarsh2012/6401d5526feccab14687

2. spark.mesos.coarse:true (coarse grain mode)
Dump for coarse mode:
https://gist.github.com/utkarsh2012/918cf6f8ed5945627188

As you can see, exactly the same code works fine in fine grained, goes out
of memory in coarse grained mode. First an executor was lost and then the
driver went out of memory.
So I am trying to understand what is different in fine grained vs coarse
mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
spark is not managing memory in the same way.

Thanks,
-Utkarsh


On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen <t...@mesosphere.io> wrote:

> Hi Utkarsh,
>
> What is your job placement like when you run fine grain mode? You said
> coarse grain mode only ran with one node right?
>
> And when the job is running could you open the Spark webui and get stats
> about the heap size and other java settings?
>
> Tim
>
> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
> wrote:
>
>> Bumping this one up, any suggestions on the stacktrace?
>> spark.mesos.coarse=true is not working and the driver crashed with the
>> error.
>>
>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
>> wrote:
>>
>>> Missed to do a reply-all.
>>>
>>> Tim,
>>>
>>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
>>> works (sorry there was a typo in my last email, I meant "when I do
>>> "spark.mesos.coarse=false", the job works like a charm. ").
>>>
>>> I get this exception with spark.mesos.coarse = true:
>>>
>>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>>> "55af5a61e8a42806f47546c1"}
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611337>15/09/22
>>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>>> "55af5a61e8a42806f47546c1"}, max= null
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611453>Exception
>>> in thread "main" java.lang.OutOfMemoryError: Java heap space
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611524>
>>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611599>
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611671>
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611743>
>>> at scala.Option.getOrElse(Option.scala:120)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611788>
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611843>
>>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611918>
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-m

Re: spark.mesos.coarse impacts memory performance on mesos

2015-09-24 Thread Utkarsh Sengar
Bumping this one up, any suggestions on the stacktrace?
spark.mesos.coarse=true is not working and the driver crashed with the
error.

On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> Missed to do a reply-all.
>
> Tim,
>
> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
> works (sorry there was a typo in my last email, I meant "when I do
> "spark.mesos.coarse=false", the job works like a charm. ").
>
> I get this exception with spark.mesos.coarse = true:
>
> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "55af4bf26750ad38a444d7cf"}, max= { "_id" : "55af5a61e8a42806f47546c1"}
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611337>15/09/22
> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
> "55af5a61e8a42806f47546c1"}, max= null
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611453>Exception
> in thread "main" java.lang.OutOfMemoryError: Java heap space
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611524>
> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611599>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611671>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611743>
> at scala.Option.getOrElse(Option.scala:120)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611788>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611843>
> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611918>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#611990>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#612062>
> at scala.Option.getOrElse(Option.scala:120)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#612107>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#612162>
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#612245>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> <http://singularity-qa-uswest2.otenv.com/task/ds-tetris-simspark-usengar.2015.09.22T20.14.36-1442952963980-1-mesos_slave1_qa_uswest2.qasql.opentable.com-us_west_2a/tail/stderr#612317>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> <http://singularity-qa-uswest2.otenv.com/tas

Re: How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Utkarsh Sengar
If broadcast variable doesn't fit in memory, I think is not the right fit
for you.
You can think about fitting it with an RDD as a tuple with other data you
are working on.

Say you are working on RDD (rdd in your case), run a map/reduce
to convert it to RDD> so now you have
relevant data from the dict as part of your RDD available locally in the
task.
Its much efficient than finding workarounds to loading it partially.

Thanks,
-Utkarsh

On Tue, Sep 22, 2015 at 11:42 AM, Clément Frison 
wrote:

> Hello,
>
> My team and I have a 32-core machine and we would like to use a huge
> object - for example a large dictionary - in a map transformation and use
> all our cores in parallel by sharing this object among some tasks.
>
> We broadcast our large dictionary.
>
> dico_br = sc.broadcast(dico)
>
> We use it in a map:
>
> rdd.map(lambda x: (x[0], function(x[1], dico_br)))
>
> where function does a lookup : dico_br.value[x]
>
> Our issue is that our dictionary is loaded 32 times in memory, and it
> doesn't fit. So what we are doing is limiting the number of executors. It
> works fine but we only have 8 cpus working in parallel instead of 32.
>
> We would like to take advantage of multicore processing and shared memory,
> as the 32 cores are in the same machine. For example we would like to load
> the dictionary in memory 8 times only and make 4 cores share it. How could
> we achieve that with Spark ?
>
>
> What we have tried - without success :
>
> 1) One driver/worker with 32 cores : local[32]
>
> 2) Standalone with one master and 8 workers - each of them having 4 cores
>
> Thanks a lot for your help, Clement
>



-- 
Thanks,
-Utkarsh


Re: How does one use s3 for checkpointing?

2015-09-21 Thread Utkarsh Sengar
We are using "spark-1.4.1-bin-hadoop2.4" on mesos (not EMR) with s3 to read
and write data and haven't noticed any inconsistencies with it, so 1
(mostly) and 2 definitely should not be a problem.
Regarding 3, are you setting the file system impl in spark config?

sparkContext.hadoopConfiguration().set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

And I have these dependencies if that helps.


org.apache.spark
spark-core_2.10
1.4.1


org.apache.hadoop
hadoop-mapreduce-client-core
2.4.1


-Utkarsh

On Mon, Sep 21, 2015 at 7:13 PM, Jerry Lam  wrote:

> Hi Amit,
>
> Have you looked at Amazon EMR? Most people using EMR use s3 for
> persistency (both as input and output of spark jobs).
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh  wrote:
>
>
> A lot of places in the documentation mention using s3 for checkpointing,
> however I haven't found any examples or concrete evidence of anyone having
> done this.
>
>1. Is this a safe/reliable option given the read-after-write
>consistency for PUTS in s3?
>2. Is s3 access broken for hadoop 2.6 (SPARK-7442
>)? If so, is it
>viable in 2.4?
>3. Related to #2. I did try providing hadoop-aws-2.6.0.jar while
>submitting the job and got the following stack trace. Is there a fix?
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError:
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at 

spark.mesos.coarse impacts memory performance on mesos

2015-09-21 Thread Utkarsh Sengar
I am running Spark 1.4.1 on mesos.

The spark job does a "cartesian" of 4 RDDs (aRdd, bRdd, cRdd, dRdd) of size
100, 100, 7 and 1 respectively. Lets call it prouctRDD.

Creation of "aRdd" needs data pull from multiple data sources, merging it
and creating a tuple of JavaRdd, finally aRDD looks something like this:
JavaRDD>
bRdd, cRdd and dRdds are just List<> of values.

Then apply a transformation on prouctRDD and finally call "saveAsTextFile"
to save the result of my transformation.

Problem:
By setting "spark.mesos.coarse=true", creation of "aRdd" works fine but
driver crashes while doing the cartesian but when I do
"spark.mesos.coarse=true", the job works like a charm. I am running spark
on mesos.

Comments:
So I wanted to understand what role does "spark.mesos.coarse=true" plays in
terms of memory and compute performance. My findings look counter intuitive
since:

   1. "spark.mesos.coarse=true" just runs on 1 mesos task, so there should
   be an overhead of spinning up mesos tasks which should impact the
   performance.
   2. What config for "spark.mesos.coarse" recommended for running spark on
   mesos? Or there is no best answer and it depends on usecase?
   3. Also by setting "spark.mesos.coarse=true", I notice that I get huge
   GC pauses even with small dataset but a long running job (but this can be a
   separate discussion).

Let me know if I am missing something obvious, we are learning spark tuning
as we move forward :)

-- 
Thanks,
-Utkarsh


Re: RDD transformation and action running out of memory

2015-09-13 Thread Utkarsh Sengar
Yup, that was the problem.
Changing the default " mongo.input.split_size" from 8MB to 100MB did the
trick.

Config reference:
https://github.com/mongodb/mongo-hadoop/wiki/Configuration-Reference

Thanks!

On Sat, Sep 12, 2015 at 3:15 PM, Richard Eggert <richard.egg...@gmail.com>
wrote:

> Hmm... The count() method invokes this:
>
> def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
> = {
>runJob(rdd, func, 0 until rdd.partitions.length)
> }
>
> It appears that you're running out of memory while trying to compute
> (within the driver) the number of partitions that will be in the final
> result. It seems as if Mongo is computing so many splits that you're
> running out of memory.
>
> Looking at your log messages, I see this:
> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
>
> 0x54e64d646d0bfe0a24ba79e1 - 0x54e64d626d0bfe0a24ba79b3 =
> 0x2002e) = 36893488147419103278
>
> The last split reported in the log has max 55adf841b4d2970fb07d7288.
>
> 0x55adf841b4d2970fb07d7288 - 0x54e64d646d0bfe0a24ba79e1 =
> 0xc7aadd47c699058bc2f8a7 = 241383122307828806444054695
>
> 241383122307828806444054695/36893488147419103278 = 6,542,702 potential
> splits, assuming they are evenly distributed. I'm not sure how big each
> split object is, but it's plausible that the process of creating an array
> of 6.5 million of them is causing you to run out of memory.
>
> I think the reason you don't see anything in the executor logs is that the
> exception is occurring before the work is tasked to the executors.
>
>
> Rich
>
>
>
> On Sat, Sep 12, 2015 at 5:18 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
> wrote:
>
>> I am trying to run this, a basic mapToPair and then count() to trigger an
>> action.
>> 4 executors are launched but I don't see any relevant logs on those
>> executors.
>>
>> It looks like the the driver is pulling all the data and it runs out of
>> memory, the dataset is big, so it won't fit on 1 machine.
>>
>> So what is the issue here? I am using spark in a wrong way in this
>> example?
>>
>> Configuration mongodbConfigInventoryDay = new Configuration();
>> mongodbConfigInventoryDay.set("mongo.job.input.format",
>> "com.mongodb.hadoop.MongoInputFormat");
>> mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
>> props.getProperty("mongo") + ":27017/A.MyColl");
>> JavaPairRDD<Object, BSONObject> MyColl = sc.newAPIHadoopRDD(
>> mongodbConfigInventoryDay,
>> MongoInputFormat.class,
>> Object.class,
>> BSONObject.class
>> );
>> JavaPairRDD<Long, MyColl> myCollRdd = myColl.mapToPair(tuple2 -> {
>> ObjectMapper mapper = new ObjectMapper();
>> tuple2._2().removeField("_id");
>> MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
>> MyColl.class);
>> return new Tuple2<>(Long.valueOf((String)
>> tuple2._2().get("MyCollId")), day);
>> });
>>
>> myCollRdd.count();
>>
>>
>> Logs on the driver:
>> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
>> curMem=253374, maxMem=278019440
>> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
>> memory (estimated size 117.8 KB, free 264.8 MB)
>> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
>> curMem=374038, maxMem=278019440
>> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
>> bytes in memory (estimated size 12.5 KB, free 264.8 MB)
>> 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
>> 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
>> newAPIHadoopRDD at SparkRunner.java:192
>> 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
>> check splits against mongodb://
>> dsc-dbs-001.qasql.opentable.com:27017/A.MyColl
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
>> max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "54e64d626d0bfe0a24ba79b3"}, max= { "_i

RDD transformation and action running out of memory

2015-09-12 Thread Utkarsh Sengar
I am trying to run this, a basic mapToPair and then count() to trigger an
action.
4 executors are launched but I don't see any relevant logs on those
executors.

It looks like the the driver is pulling all the data and it runs out of
memory, the dataset is big, so it won't fit on 1 machine.

So what is the issue here? I am using spark in a wrong way in this example?

Configuration mongodbConfigInventoryDay = new Configuration();
mongodbConfigInventoryDay.set("mongo.job.input.format",
"com.mongodb.hadoop.MongoInputFormat");
mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
props.getProperty("mongo") + ":27017/A.MyColl");
JavaPairRDD MyColl = sc.newAPIHadoopRDD(
mongodbConfigInventoryDay,
MongoInputFormat.class,
Object.class,
BSONObject.class
);
JavaPairRDD myCollRdd = myColl.mapToPair(tuple2 -> {
ObjectMapper mapper = new ObjectMapper();
tuple2._2().removeField("_id");
MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
MyColl.class);
return new Tuple2<>(Long.valueOf((String)
tuple2._2().get("MyCollId")), day);
});

myCollRdd.count();


Logs on the driver:
15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
curMem=253374, maxMem=278019440
15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 117.8 KB, free 264.8 MB)
15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
curMem=374038, maxMem=278019440
15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 12.5 KB, free 264.8 MB)
15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
newAPIHadoopRDD at SparkRunner.java:192
15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
check splits against mongodb://
dsc-dbs-001.qasql.opentable.com:27017/A.MyColl
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : "5581d1c3d52db40bc8558c6b"}
..
..
15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "55adf840d3b5be0724224807"}, max= { "_id" : "55adf841b4d2970fb07d7288"}
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at org.bson.io.PoolOutputBuffer.(PoolOutputBuffer.java:224)
at org.bson.BasicBSONDecoder.(BasicBSONDecoder.java:499)
at
com.mongodb.hadoop.input.MongoInputSplit.(MongoInputSplit.java:59)
at
com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248)
at
com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157)
at
com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at
org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442)
at
org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47)
at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205)
at runner.SparkRunner.main(SparkRunner.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at 

Porting a multit-hreaded compute intensive job to spark

2015-08-27 Thread Utkarsh Sengar
I am working on code which uses executor service to parallelize tasks
(think machine learning computations done over small dataset over and over
again).
My goal is to execute some code as fast as possible, multiple times and
store the result somewhere (total executions will be on the order of 100M
runs atleast).

The logic looks something like this (its a simplified example):

dbconn = new dbconn() //This is reused by all threads
for a in listOfSize1000:
   for b in listofSize10:
  for c in listOfSize2:
 taskcompletionexecutorservice.submit(new runner(a, b, c, dbconn))

At the end, taskcompletionexecutorservice.take() is called and I store the
Result from FutureResult in a db.
But this approach is not really scaling after a point.

So this is what I am doing right now in spark (which is a brutal hack, but
I am looking for suggestions on how to best structure this):

sparkContext.parallelize(listOfSize1000).filter(a - {
   dbconn = new dbconn() //Cannot init it outsize parallelize since its not
serializable
   for b in listofSize10:
  for c in listOfSize2:
 Result r = new runner(a, b, c. dbconn))
 dbconn.store(r)

return true //It serves no purpose.
}).count();

This approach looks inefficient to me since its not truly parallelizing on
the smallest unit of job, although this job works alright. Also count is
not really doing anything for for me, i added it to trigger the execution.
It was inspired by computing the pi example here:
http://spark.apache.org/examples.html

So any suggestions of how can I better structure my spark runner so that I
can efficiently use spark executors?

-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-25 Thread Utkarsh Sengar
This worked for me locally:
spark-1.4.1-bin-hadoop2.4/bin/spark-submit --conf
spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar:/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
--conf
spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar:/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
--verbose --class runner.SparkRunner target/simspark-0.1-SNAPSHOT.jar


Now I am going to try it out on our mesos cluster.
I assumed spark.executor.extraClassPath takes csv as jars the way
--jars takes it but it should be : separated like a regular classpath
jar.

Thanks for your help!
-Utkarsh


On Mon, Aug 24, 2015 at 5:05 PM, Utkarsh Sengar utkarsh2...@gmail.com
wrote:

 I get the same error even when I set the SPARK_CLASSPATH: export
 SPARK_CLASSPATH=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.1.jar:/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 And I run the job like this: /spark-1.4.1-bin-hadoop2.4/bin/spark-submit
 --class runner.SparkRunner
 target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

 I am not able to find the code in spark which adds these jars before the
 spark classes in classpath. Or maybe its a bug. Any suggestions on
 workarounds?

 Thanks,
 -Utkarsh


 On Mon, Aug 24, 2015 at 4:32 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:

 I assumed that's the case beacause of the error I got and the
 documentation which says: Extra classpath entries to append to the
 classpath of the driver.

 This is where I stand now:
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.4.1/version
 exclusions
 exclusion
 groupIdorg.slf4j/groupId
 artifactIdslf4j-log4j12/artifactId
 /exclusion
 /exclusions
 /dependency

 And no exclusions from my logging lib.

 And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit
 --class runner.SparkRunner --conf
 spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
 --conf
 spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
 --conf
 spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 --conf
 spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

 And I get the same error:
 Caused by: java.lang.ClassCastException:
 org.slf4j.impl.Log4jLoggerFactory cannot be cast to
 ch.qos.logback.classic.LoggerContext
 at
 com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
 at
 com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
 at com.opentable.logging.Log.clinit(Log.java:31)
 ... 16 more


 Thanks,
 -Utkarsh

 On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  That didn't work since extraClassPath flag was still appending the
 jars at
  the end, so its still picking the slf4j jar provided by spark.

 Out of curiosity, how did you verify this? The extraClassPath
 options are supposed to prepend entries to the classpath, and the code
 seems to be doing that. If it's not really doing that in some case,
 it's a bug that needs to be fixed.

 Another option is those is setting the SPARK_CLASSPATH env variable,
 which is deprecated, but might come in handy in case there is actually
 a bug in handling those options.


 --
 Marcelo




 --
 Thanks,
 -Utkarsh




 --
 Thanks,
 -Utkarsh




-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-25 Thread Utkarsh Sengar
So do I need to manually copy these 2 jars on my spark executors?



On Tue, Aug 25, 2015 at 10:51 AM, Marcelo Vanzin van...@cloudera.com
wrote:

 On Tue, Aug 25, 2015 at 10:48 AM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  Now I am going to try it out on our mesos cluster.
  I assumed spark.executor.extraClassPath takes csv as jars the way
 --jars
  takes it but it should be : separated like a regular classpath jar.

 Ah, yes, those options are just raw classpath strings. Also, they
 don't cause jars to be copied to the cluster. You'll need the jar to
 be available at the same location on all cluster machines.

 --
 Marcelo




-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-25 Thread Utkarsh Sengar
Looks like I stuck then, I am using mesos.
Adding these 2 jars to all executors might be a problem for me, I will
probably try to remove the dependency on the otj-logging lib then and just
use log4j.

On Tue, Aug 25, 2015 at 2:15 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Tue, Aug 25, 2015 at 1:50 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  So do I need to manually copy these 2 jars on my spark executors?

 Yes. I can think of a way to work around that if you're using YARN,
 but not with other cluster managers.

  On Tue, Aug 25, 2015 at 10:51 AM, Marcelo Vanzin van...@cloudera.com
  wrote:
 
  On Tue, Aug 25, 2015 at 10:48 AM, Utkarsh Sengar utkarsh2...@gmail.com
 
  wrote:
   Now I am going to try it out on our mesos cluster.
   I assumed spark.executor.extraClassPath takes csv as jars the way
   --jars
   takes it but it should be : separated like a regular classpath jar.
 
  Ah, yes, those options are just raw classpath strings. Also, they
  don't cause jars to be copied to the cluster. You'll need the jar to
  be available at the same location on all cluster machines.


 --
 Marcelo




-- 
Thanks,
-Utkarsh


Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
Continuing this discussion:
http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html

I am getting this error when I use logback-classic.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

I need to use logback-classic for my current project, so I am trying to
ignore slf4j-log4j12 from spark:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
exclusions
exclusion
groupIdorg.slf4j/groupId
artifactIdslf4j-log4j12/artifactId
/exclusion
/exclusions
/dependency

Now, when I run my job from Intellij (which sets the classpath), things
work perfectly.

But when I run my job via spark-submit:
~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner
spark-0.1-SNAPSHOT-jar-with-dependencies.jar
My job fails because spark-submit sets up the classpath and it re-adds the
slf4j-log4j12.

I am not adding spark jar to the uber-jar via the maven assembly plugin:
 dependencySets
dependencySet
..
useTransitiveDependenciesfalse/useTransitiveDependencies
excludes
excludeorg.apache.spark:spark-core_2.10/exclude
/excludes
/dependencySet
/dependencySets

So how can I exclude slf4j-log4j12.jar when I submit a job via
spark-submit (on a per job basis)?

-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
Hi Marcelo,

When I add this exclusion rule to my pom:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
exclusions
exclusion
groupIdorg.slf4j/groupId
artifactIdslf4j-log4j12/artifactId
/exclusion
/exclusions
/dependency

The SparkRunner class works fine (from IntelliJ) but when I build a jar and
submit it to spark-submit:

I get this error:
Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
cannot be cast to ch.qos.logback.classic.LoggerContext
at
com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
at
com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
at com.opentable.logging.Log.clinit(Log.java:31)

Which is this here (our logging lib is open sourced):
https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68

Thanks,
-Utkarsh




On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Utkarsh,

 Unfortunately that's not going to be easy. Since Spark bundles all
 dependent classes into a single fat jar file, to remove that
 dependency you'd need to modify Spark's assembly jar (potentially in
 all your nodes). Doing that per-job is even trickier, because you'd
 probably need some kind of script to inject the correct binding into
 Spark's classpath.

 That being said, that message is not an error, it's more of a noisy
 warning. I'd expect slf4j to use the first binding available - in your
 case, logback-classic. Is that not the case?


 On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  Continuing this discussion:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html
 
  I am getting this error when I use logback-classic.
 
  SLF4J: Class path contains multiple SLF4J bindings.
  SLF4J: Found binding in
 
 [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: Found binding in
 
 [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
  I need to use logback-classic for my current project, so I am trying to
  ignore slf4j-log4j12 from spark:
  dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.4.1/version
  exclusions
  exclusion
  groupIdorg.slf4j/groupId
  artifactIdslf4j-log4j12/artifactId
  /exclusion
  /exclusions
  /dependency
 
  Now, when I run my job from Intellij (which sets the classpath), things
 work
  perfectly.
 
  But when I run my job via spark-submit:
  ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner
  spark-0.1-SNAPSHOT-jar-with-dependencies.jar
  My job fails because spark-submit sets up the classpath and it re-adds
 the
  slf4j-log4j12.
 
  I am not adding spark jar to the uber-jar via the maven assembly plugin:
   dependencySets
  dependencySet
  ..
  useTransitiveDependenciesfalse/useTransitiveDependencies
  excludes
  excludeorg.apache.spark:spark-core_2.10/exclude
  /excludes
  /dependencySet
  /dependencySets
 
  So how can I exclude slf4j-log4j12.jar when I submit a job via
  spark-submit (on a per job basis)?
 
  --
  Thanks,
  -Utkarsh



 --
 Marcelo




-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
I get the same error even when I set the SPARK_CLASSPATH: export
SPARK_CLASSPATH=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.1.jar:/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
And I run the job like this: /spark-1.4.1-bin-hadoop2.4/bin/spark-submit
--class runner.SparkRunner
target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

I am not able to find the code in spark which adds these jars before the
spark classes in classpath. Or maybe its a bug. Any suggestions on
workarounds?

Thanks,
-Utkarsh


On Mon, Aug 24, 2015 at 4:32 PM, Utkarsh Sengar utkarsh2...@gmail.com
wrote:

 I assumed that's the case beacause of the error I got and the
 documentation which says: Extra classpath entries to append to the
 classpath of the driver.

 This is where I stand now:
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.4.1/version
 exclusions
 exclusion
 groupIdorg.slf4j/groupId
 artifactIdslf4j-log4j12/artifactId
 /exclusion
 /exclusions
 /dependency

 And no exclusions from my logging lib.

 And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class
 runner.SparkRunner --conf
 spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
 --conf
 spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
 --conf
 spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 --conf
 spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

 And I get the same error:
 Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
 cannot be cast to ch.qos.logback.classic.LoggerContext
 at
 com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
 at
 com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
 at com.opentable.logging.Log.clinit(Log.java:31)
 ... 16 more


 Thanks,
 -Utkarsh

 On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  That didn't work since extraClassPath flag was still appending the
 jars at
  the end, so its still picking the slf4j jar provided by spark.

 Out of curiosity, how did you verify this? The extraClassPath
 options are supposed to prepend entries to the classpath, and the code
 seems to be doing that. If it's not really doing that in some case,
 it's a bug that needs to be fixed.

 Another option is those is setting the SPARK_CLASSPATH env variable,
 which is deprecated, but might come in handy in case there is actually
 a bug in handling those options.


 --
 Marcelo




 --
 Thanks,
 -Utkarsh




-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
That didn't work since extraClassPath flag was still appending the jars
at the end, so its still picking the slf4j jar provided by spark.
Although I found this flag: --conf spark.executor.userClassPathFirst=true
(http://spark.apache.org/docs/latest/configuration.html) and tried this:

➜  simspark git:(bulkrunner) ✗ spark-1.4.1-bin-hadoop2.4/bin/spark-submit
--class runner.SparkRunner --jars
/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar,/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
--conf spark.executor.userClassPathFirst=true --conf
spark.driver.userClassPathFirst=true
target/ds-tetris-simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

But this led to another error: com.typesafe.config.ConfigException$Missing:
No configuration setting found for key 'akka.version'

Thanks,
-Utkarsh

On Mon, Aug 24, 2015 at 3:25 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Utkarsh,

 A quick look at slf4j's source shows it loads the first
 StaticLoggerBinder in your classpath. How are you adding the logback
 jar file to spark-submit?

 If you use spark.driver.extraClassPath and
 spark.executor.extraClassPath to add the jar, it should take
 precedence over the log4j binding embedded in the Spark assembly.


 On Mon, Aug 24, 2015 at 3:15 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  Hi Marcelo,
 
  When I add this exclusion rule to my pom:
  dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.4.1/version
  exclusions
  exclusion
  groupIdorg.slf4j/groupId
  artifactIdslf4j-log4j12/artifactId
  /exclusion
  /exclusions
  /dependency
 
  The SparkRunner class works fine (from IntelliJ) but when I build a jar
 and
  submit it to spark-submit:
 
  I get this error:
  Caused by: java.lang.ClassCastException:
 org.slf4j.impl.Log4jLoggerFactory
  cannot be cast to ch.qos.logback.classic.LoggerContext
  at
 
 com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
  at
 
 com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
  at com.opentable.logging.Log.clinit(Log.java:31)
 
  Which is this here (our logging lib is open sourced):
 
 https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68
 
  Thanks,
  -Utkarsh
 
 
 
 
  On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com
 wrote:
 
  Hi Utkarsh,
 
  Unfortunately that's not going to be easy. Since Spark bundles all
  dependent classes into a single fat jar file, to remove that
  dependency you'd need to modify Spark's assembly jar (potentially in
  all your nodes). Doing that per-job is even trickier, because you'd
  probably need some kind of script to inject the correct binding into
  Spark's classpath.
 
  That being said, that message is not an error, it's more of a noisy
  warning. I'd expect slf4j to use the first binding available - in your
  case, logback-classic. Is that not the case?
 
 
  On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com
  wrote:
   Continuing this discussion:
  
  
 http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html
  
   I am getting this error when I use logback-classic.
  
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in
  
  
 [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in
  
  
 [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  
   I need to use logback-classic for my current project, so I am trying
 to
   ignore slf4j-log4j12 from spark:
   dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-core_2.10/artifactId
   version1.4.1/version
   exclusions
   exclusion
   groupIdorg.slf4j/groupId
   artifactIdslf4j-log4j12/artifactId
   /exclusion
   /exclusions
   /dependency
  
   Now, when I run my job from Intellij (which sets the classpath),
 things
   work
   perfectly.
  
   But when I run my job via spark-submit:
   ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class
 runner.SparkRunner
   spark-0.1-SNAPSHOT-jar-with-dependencies.jar
   My job fails because spark-submit sets up the classpath and it re-adds
   the
   slf4j-log4j12.
  
   I am not adding spark jar to the uber-jar via the maven assembly
 plugin:
dependencySets
   dependencySet
   ..
  
  useTransitiveDependenciesfalse/useTransitiveDependencies

Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
I assumed that's the case beacause of the error I got and the documentation
which says: Extra classpath entries to append to the classpath of the
driver.

This is where I stand now:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
exclusions
exclusion
groupIdorg.slf4j/groupId
artifactIdslf4j-log4j12/artifactId
/exclusion
/exclusions
/dependency

And no exclusions from my logging lib.

And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class
runner.SparkRunner --conf
spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
--conf
spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
--conf
spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
--conf
spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

And I get the same error:
Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
cannot be cast to ch.qos.logback.classic.LoggerContext
at
com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
at
com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
at com.opentable.logging.Log.clinit(Log.java:31)
... 16 more


Thanks,
-Utkarsh

On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  That didn't work since extraClassPath flag was still appending the
 jars at
  the end, so its still picking the slf4j jar provided by spark.

 Out of curiosity, how did you verify this? The extraClassPath
 options are supposed to prepend entries to the classpath, and the code
 seems to be doing that. If it's not really doing that in some case,
 it's a bug that needs to be fixed.

 Another option is those is setting the SPARK_CLASSPATH env variable,
 which is deprecated, but might come in handy in case there is actually
 a bug in handling those options.


 --
 Marcelo




-- 
Thanks,
-Utkarsh