Re: Distance metrics in KMeans

2015-09-26 Thread Robineast
There is a Spark Package that gives some alternative distance metrics,
http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering.
Not used it myself.



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Distance-metrics-in-KMeans-tp24823p24829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
Hi Dibyendu,

I am not sure I understand completely. But are you suggesting that
currently there is no way to enable Checkpoint directory to be in Tachyon?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi,
>
> Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
> Streaming and make sure Spark Streaming can recover from Driver failure and
> recover the blocks form Tachyon.
>
> The The Motivation for this PR is  :
>
> If Streaming application stores the blocks OFF_HEAP, it may not need any
> WAL like feature to recover from Driver failure. As long as the writing of
> blocks to Tachyon from Streaming receiver is durable, it should be
> recoverable from Tachyon directly on Driver failure.
> This can solve the issue of expensive WAL write and duplicating the blocks
> both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
> channel using OFF_HEAP store.
>
> https://github.com/apache/spark/pull/8817
>
> This PR still under review . But having done various fail over testing in
> my environment , I see this PR worked perfectly fine without any data loss
> . Let see what TD and other have to say on this PR .
>
> Below is the configuration I used to test this PR ..
>
>
> Spark : 1.6 from Master
> Tachyon : 0.7.1
>
> SparkConfiguration Details :
>
> SparkConf conf = new SparkConf().setAppName("TestTachyon")
> .set("spark.streaming.unpersist", "true")
> .set("spark.local.dir", "/mnt1/spark/tincan")
> .set("tachyon.zookeeper.address","10.252.5.113:2182")
> .set("tachyon.usezookeeper","true")
> .set("spark.externalBlockStore.url", "tachyon-ft://
> ip-10-252-5-113.asskickery.us:19998")
> .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
> .set("spark.externalBlockStore.folderName","pearson")
> .set("spark.externalBlockStore.dirId", "subpub")
>
> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>
> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
> 1));
>
> String checkpointDirectory = "hdfs://
> 10.252.5.113:9000/user/hadoop/spark/wal";
>
> jsc.checkpoint(checkpointDirectory);
>
>
> //I am using the My Receiver Based Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) . But
> KafkaUtil.CreateStream will also work
>
> JavaDStream unionStreams = ReceiverLauncher.launch(
> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>
>
>
>
> Regards,
> Dibyendu
>
> On Sat, Sep 26, 2015 at 11:59 AM, N B  wrote:
>
>> Hi Dibyendu,
>>
>> How does one go about configuring spark streaming to use tachyon as its
>> place for storing checkpoints? Also, can one do this with tachyon running
>> on a completely different node than where spark processes are running?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Thanks for looking into this. Further investigating I found that the
>>> issue is with Tachyon does not support File Append. The streaming receiver
>>> which writes to WAL when failed, and again restarted, not able to append to
>>> same WAL file after restart.
>>>
>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>> months time Tachyon file append will be ready. Will revisit this issue
>>> again then .
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das 
>>> wrote:
>>>
 Looks like somehow the file size reported by the FSInputDStream of
 Tachyon's FileSystem interface, is returning zero.

 On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
 dibyendu.bhattach...@gmail.com> wrote:

> Just to follow up this thread further .
>
> I was doing some fault tolerant testing of Spark Streaming with
> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
> to solve the BlockNotFound exception when I used Hierarchical Storage
> of Tachyon ,  which is good.
>
> I continue doing some testing around storing the Spark Streaming WAL
> and CheckPoint files also in Tachyon . Here is few finding ..
>
>
> When I store the Spark Streaming Checkpoint location in Tachyon , the
> throughput is much higher . I tested the Driver and Receiver failure cases
> , and Spark Streaming is able to recover without any Data Loss on Driver
> failure.
>
> *But on Receiver failure , Spark Streaming looses data* as I see
> Exception while reading the WAL file from Tachyon "receivedData" location
>  for the same Receiver id which just failed.
>
> If I change the Checkpoint location back to HDFS , Spark Streaming can
> recover from both Driver and Receiver failure .
>
> Here is the Log details when Spark Streaming receiver failed ...I
> raised a JIRA for the same issue :
> 

Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-26 Thread Gavin Yue
It is working, We are doing the same thing everyday.  But the remote server
needs to able to talk with ResourceManager.

If you are using Spark-submit,  your will also specify the hadoop conf
directory in your Env variable. Spark would rely on that to locate where
the cluster's resource manager is.

I think this tutorial is pretty clear:
http://spark.apache.org/docs/latest/running-on-yarn.html



On Fri, Sep 25, 2015 at 7:11 PM, Zhiliang Zhu  wrote:

> Hi Yue,
>
> Thanks very much for your kind reply.
>
> I would like to submit spark job remotely on another machine outside the
> cluster,
> and the job will run on yarn, similar as hadoop job is already done, could
> you
> confirm it could exactly work for spark...
>
> Do you mean that I would print those variables on linux command side?
>
> Best Regards,
> Zhiliang
>
>
>
>
>
> On Saturday, September 26, 2015 10:07 AM, Gavin Yue <
> yue.yuany...@gmail.com> wrote:
>
>
> Print out your env variables and check first
>
> Sent from my iPhone
>
> On Sep 25, 2015, at 18:43, Zhiliang Zhu  > wrote:
>
> Hi All,
>
> I would like to submit spark job on some another remote machine outside
> the cluster,
> I also copied hadoop/spark conf files under the remote machine, then hadoop
> job would be submitted, but spark job would not.
>
> In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,
> or for some other reasons...
>
> This issue is urgent for me, would some expert provide some help about
> this problem...
>
> I will show sincere appreciation towards your help.
>
> Thank you!
> Best Regards,
> Zhiliang
>
>
>
>
> On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu <
> zchl.j...@yahoo.com.INVALID > wrote:
>
>
> Hi all,
>
> The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or
> just set as
> export  SPARK_LOCAL_IP=localhost#or set as the specific node ip on the
> specific spark install directory
>
> It will work well to submit spark job on master node of cluster, however,
> it will fail by way of some gateway machine remotely.
>
> The gateway machine is already configed, it works well to submit hadoop
> job.
> It is set as:
> export SCALA_HOME=/usr/lib/scala
> export JAVA_HOME=/usr/java/jdk1.7.0_45
> export R_HOME=/usr/lib/r
> export HADOOP_HOME=/usr/lib/hadoop
> export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
>
> export SPARK_MASTER_IP=master01
> #export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set,
> SparkContext will not start
> export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is
> started, but failed later
> export SPARK_LOCAL_DIRS=/data/spark_local_dir
> ...
>
> The error messages:
> 15/09/25 19:07:12 INFO util.Utils: Successfully started service
> 'sparkYarnAM' on port 48133.
> 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to
> be reachable.
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
>
>  I shall sincerely appreciate your kind help very much!
> Zhiliang
>
>
>
>
>
>
>


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread Dibyendu Bhattacharya
Hi,

Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
Streaming and make sure Spark Streaming can recover from Driver failure and
recover the blocks form Tachyon.

The The Motivation for this PR is  :

If Streaming application stores the blocks OFF_HEAP, it may not need any
WAL like feature to recover from Driver failure. As long as the writing of
blocks to Tachyon from Streaming receiver is durable, it should be
recoverable from Tachyon directly on Driver failure.
This can solve the issue of expensive WAL write and duplicating the blocks
both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
channel using OFF_HEAP store.

https://github.com/apache/spark/pull/8817

This PR still under review . But having done various fail over testing in
my environment , I see this PR worked perfectly fine without any data loss
. Let see what TD and other have to say on this PR .

Below is the configuration I used to test this PR ..


Spark : 1.6 from Master
Tachyon : 0.7.1

SparkConfiguration Details :

SparkConf conf = new SparkConf().setAppName("TestTachyon")
.set("spark.streaming.unpersist", "true")
.set("spark.local.dir", "/mnt1/spark/tincan")
.set("tachyon.zookeeper.address","10.252.5.113:2182")
.set("tachyon.usezookeeper","true")
.set("spark.externalBlockStore.url", "tachyon-ft://
ip-10-252-5-113.asskickery.us:19998")
.set("spark.externalBlockStore.baseDir", "/sparkstreaming")
.set("spark.externalBlockStore.folderName","pearson")
.set("spark.externalBlockStore.dirId", "subpub")

.set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
1));

String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal
";

jsc.checkpoint(checkpointDirectory);


//I am using the My Receiver Based Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) . But
KafkaUtil.CreateStream will also work

JavaDStream unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());




Regards,
Dibyendu

On Sat, Sep 26, 2015 at 11:59 AM, N B  wrote:

> Hi Dibyendu,
>
> How does one go about configuring spark streaming to use tachyon as its
> place for storing checkpoints? Also, can one do this with tachyon running
> on a completely different node than where spark processes are running?
>
> Thanks
> Nikunj
>
>
> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for looking into this. Further investigating I found that the
>> issue is with Tachyon does not support File Append. The streaming receiver
>> which writes to WAL when failed, and again restarted, not able to append to
>> same WAL file after restart.
>>
>> I raised this with Tachyon user group, and Haoyuan told that within 3
>> months time Tachyon file append will be ready. Will revisit this issue
>> again then .
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das 
>> wrote:
>>
>>> Looks like somehow the file size reported by the FSInputDStream of
>>> Tachyon's FileSystem interface, is returning zero.
>>>
>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon
 as OFF_HEAP block store. As I said in earlier email, I could able to solve
 the BlockNotFound exception when I used Hierarchical Storage of
 Tachyon ,  which is good.

 I continue doing some testing around storing the Spark Streaming WAL
 and CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon "receivedData" location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I
 raised a JIRA for the same issue :
 https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
 (epoch 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2

Re: GraphX create graph with multiple node attributes

2015-09-26 Thread JJ
Robineast wrote
> 2) let GraphX supply a null instead
>  val graph = Graph(vertices, edges)  // vertices found in 'edges' but
> not in 'vertices' will be set to null 

Thank you! This method works.

As a follow up (sorry I'm new to this, don't know if I should start a new
thread?): if I have vertices that are in 'vertices' but not in 'edges' (the
opposite of what you mention), will they be counted as part of the graph
but with 0 edges, or will they be dropped from the graph? When I count the
number of vertices with vertices.count, I get 13,628 nodes. When I count
graph vertices with graph.vertices.count, I get 12,274 nodes. When I count
vertices with 1+ degrees with graph.degrees.count I get 10,091 vertices...
What am I dropping each time?

Thanks again!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-create-graph-with-multiple-node-attributes-tp24827p24830.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX create graph with multiple node attributes

2015-09-26 Thread JJ
Here is all of my code. My first post had a simplified version. As I post
this, I realize one issue may be that when I convert my Ids to long (I
define a pageHash function to convert string Ids to long), the nodeIds are
no longer the same between the 'vertices' object and the 'edges' object. Do
you think this is what is causing the issue?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-create-graph-with-multiple-node-attributes-tp24827p24832.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird worker usage

2015-09-26 Thread N B
Hello,

Does anyone have an insight into what could be the issue here?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 10:44 AM, N B  wrote:

> Hi Akhil,
>
> I do have 25 partitions being created. I have set
> the spark.default.parallelism property to 25. Batch size is 30 seconds and
> block interval is 1200 ms which also gives us roughly 25 partitions from
> the input stream. I can see 25 partitions being created and used in the
> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
> free before being scheduled while N2 is sitting idle.
>
> The cluster configuration is:
>
> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>
> for a grand total of 28 cores. But it still does most of the processing on
> N1 (divided among the 2 workers running) but almost completely disregarding
> N2 until its the final stage where data is being written out to
> Elasticsearch. I am not sure I understand the reason behind it not
> distributing more partitions to N2 to begin with and use it effectively.
> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
> send some of those partitions to N2 as well?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
> wrote:
>
>> Parallel tasks totally depends on the # of partitions that you are
>> having, if you are not receiving sufficient partitions (partitions > total
>> # cores) then try to do a .repartition.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>>
>>> Hello all,
>>>
>>> I have a Spark streaming application that reads from a Flume Stream,
>>> does quite a few maps/filters in addition to a few reduceByKeyAndWindow and
>>> join operations before writing the analyzed output to ElasticSearch inside
>>> a foreachRDD()...
>>>
>>> I recently started to run this on a 2 node cluster (Standalone) with the
>>> driver program directly submitting to Spark master on the same host. The
>>> way I have divided the resources is as follows:
>>>
>>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>>> worker)
>>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>>
>>> The application works just fine but it is underusing N2 completely. It
>>> seems to use N1 (note that both executors on N1 get used) for all the
>>> analytics but when it comes to writing to Elasticsearch, it does divide the
>>> data around into all 4 executors which then write to ES on a separate host.
>>>
>>> I am puzzled as to why the data is not being distributed evenly from the
>>> get go into all 4 executors and why would it only do so in the final step
>>> of the pipeline which seems counterproductive as well?
>>>
>>> CPU usage on N1 is near the peak while on N2 is < 10% of overall
>>> capacity.
>>>
>>> Any help in getting the resources more evenly utilized on N1 and N2 is
>>> welcome.
>>>
>>> Thanks in advance,
>>> Nikunj
>>>
>>>
>>
>


Re: GraphX create graph with multiple node attributes

2015-09-26 Thread Robineast
Vertices that aren't connected to anything are perfectly valid e.g.

import org.apache.spark.graphx._

val vertices = sc.makeRDD(Seq((1L,1),(2L,1),(3L,1)))
val edges = sc.makeRDD(Seq(Edge(1L,2L,1)))

val g = Graph(vertices, edges)
g.vertices.count

gives 3

Not sure why vertices appear to be dropping off. Could you show your full
code.

g.degrees.count gives 2 - as the scaladocs mention 'The degree of each
vertex in the graph. @note Vertices with no edges are not returned in the
resulting RDD'






-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-create-graph-with-multiple-node-attributes-tp24827p24831.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-26 Thread Jerry Lam
Hi Michael,

Thanks for the tip. With dataframe, is it possible to explode some selected
fields in each purchase_items?
Since purchase_items is an array of item and each item has a number of
fields (for example product_id and price), is it possible to just explode
these two fields directly using dataframe?

Best Regards,


Jerry

On Fri, Sep 25, 2015 at 7:53 PM, Michael Armbrust 
wrote:

> The SQL parser without HiveContext is really simple, which is why I
> generally recommend users use HiveContext.  However, you can do it with
> dataframes:
>
> import org.apache.spark.sql.functions._
> table("purchases").select(explode(df("purchase_items")).as("item"))
>
>
>
> On Fri, Sep 25, 2015 at 4:21 PM, Jerry Lam  wrote:
>
>> Hi sparkers,
>>
>> Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
>> I don't want to start up a metastore and derby just because I need
>> LATERAL VIEW EXPLODE.
>>
>> I have been trying but I always get the exception like this:
>>
>> Name: java.lang.RuntimeException
>> Message: [1.68] failure: ``union'' expected but identifier view found
>>
>> with the query look like:
>>
>> "select items from purhcases lateral view explode(purchase_items) tbl as
>> items"
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>


Problem with multiple fields with same name in Avro

2015-09-26 Thread Anders Arpteg
Hi,

Received the following error when reading an Avro source with Spark 1.5.0
and the com.databricks.spark.avro reader. In the data source, there is one
nested field named "UserActivity.history.activity" and another named
"UserActivity.activity". This seems to be the reason for the execption,
since the two fields are named the same but in different levels in the
hierarchy.

Any ideas of how to get around this? Execption occurs directly when trying
to load the data.

Thanks,
Anders

15/09/26 11:42:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
lon4-hadoopslave-a148.lon4.spotify.net):
org.apache.avro.AvroRuntimeException: Bad in
dex
at
com.spotify.analytics.schema.UserActivity.put(UserActivity.java:60)
at
org.apache.avro.generic.GenericData.setField(GenericData.java:573)
at
org.apache.avro.generic.GenericData.setField(GenericData.java:590)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:66)
at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
at
com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:127)
at
com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:126)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
Hi Dibyendu,

Thanks. I believe I understand why it has been an issue using S3 for
checkpoints based on your explanation. But does this limitation apply only
if recovery is needed in case of driver failure?

What if we are not interested in recovery after a driver failure. However,
just for the purposes of running streaming pipelines that do
reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
checkpoint directory configured.

Do you think this usage will also run into issues if an S3 location is
provided for the checkpoint directory. We will not use it to do any
explicit recovery like I stated above.

Thanks
Nikunj



On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> In Spark Streaming , Checkpoint Directory is used for two purpose
>
> 1. Metadata checkpointing
>
> 2. Data checkpointing
>
> If you enable WAL to recover from Driver failure, Spark Streaming will
> also write the Received Blocks in WAL which stored in checkpoint directory.
>
> For streaming solution to recover from any failure without any data loss ,
> you need to enable Meta Data Check pointing and WAL.  You do not need to
> enable Data Check pointing.
>
> From my experiments and the PR I mentioned , I configured the Meta Data
> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
> did not use any WAL . The PR I proposed would recover from Driver fail-over
> without using any WAL like feature because Blocks are already available in
> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
> received blocks.
>
> Now the question is , can I configure Tachyon as my Metadata Checkpoint
> location ? I tried that , and Streaming application writes the
> receivedBlockMeataData to Tachyon, but on driver failure, it can not
> recover the received block meta data from Tachyon. I sometime see Zero size
> files in Tachyon checkpoint location , and it can not recover past events .
> I need to understand what is the issue of storing meta data in Tachyon .
> That needs a different JIRA I guess.
>
> Let me know I am able to explain the current scenario around Spark
> Streaming and Tachyon .
>
> Regards,
> Dibyendu
>
>
>
>
> On Sat, Sep 26, 2015 at 1:04 PM, N B  wrote:
>
>> Hi Dibyendu,
>>
>> I am not sure I understand completely. But are you suggesting that
>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>> failure and recover the blocks form Tachyon.
>>>
>>> The The Motivation for this PR is  :
>>>
>>> If Streaming application stores the blocks OFF_HEAP, it may not need any
>>> WAL like feature to recover from Driver failure. As long as the writing of
>>> blocks to Tachyon from Streaming receiver is durable, it should be
>>> recoverable from Tachyon directly on Driver failure.
>>> This can solve the issue of expensive WAL write and duplicating the
>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>> No-Data-Loss channel using OFF_HEAP store.
>>>
>>> https://github.com/apache/spark/pull/8817
>>>
>>> This PR still under review . But having done various fail over testing
>>> in my environment , I see this PR worked perfectly fine without any data
>>> loss . Let see what TD and other have to say on this PR .
>>>
>>> Below is the configuration I used to test this PR ..
>>>
>>>
>>> Spark : 1.6 from Master
>>> Tachyon : 0.7.1
>>>
>>> SparkConfiguration Details :
>>>
>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>> .set("spark.streaming.unpersist", "true")
>>> .set("spark.local.dir", "/mnt1/spark/tincan")
>>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>>> .set("tachyon.usezookeeper","true")
>>> .set("spark.externalBlockStore.url", "tachyon-ft://
>>> ip-10-252-5-113.asskickery.us:19998")
>>> .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>> .set("spark.externalBlockStore.folderName","pearson")
>>> .set("spark.externalBlockStore.dirId", "subpub")
>>>
>>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>>
>>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>>> 1));
>>>
>>> String checkpointDirectory = "hdfs://
>>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>>
>>> jsc.checkpoint(checkpointDirectory);
>>>
>>>
>>> //I am using the My Receiver Based Consumer (
>>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>>> KafkaUtil.CreateStream will also work
>>>
>>> JavaDStream unionStreams = ReceiverLauncher.launch(
>>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>>
>>>
>>>
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Sat, Sep 26, 2015 at 11:59 AM, N B 

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
I wanted to add that we are not configuring the WAL in our scenario.

Thanks again,
Nikunj


On Sat, Sep 26, 2015 at 11:35 AM, N B  wrote:

> Hi Dibyendu,
>
> Thanks. I believe I understand why it has been an issue using S3 for
> checkpoints based on your explanation. But does this limitation apply only
> if recovery is needed in case of driver failure?
>
> What if we are not interested in recovery after a driver failure. However,
> just for the purposes of running streaming pipelines that do
> reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
> checkpoint directory configured.
>
> Do you think this usage will also run into issues if an S3 location is
> provided for the checkpoint directory. We will not use it to do any
> explicit recovery like I stated above.
>
> Thanks
> Nikunj
>
>
>
> On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> In Spark Streaming , Checkpoint Directory is used for two purpose
>>
>> 1. Metadata checkpointing
>>
>> 2. Data checkpointing
>>
>> If you enable WAL to recover from Driver failure, Spark Streaming will
>> also write the Received Blocks in WAL which stored in checkpoint directory.
>>
>> For streaming solution to recover from any failure without any data loss
>> , you need to enable Meta Data Check pointing and WAL.  You do not need to
>> enable Data Check pointing.
>>
>> From my experiments and the PR I mentioned , I configured the Meta Data
>> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
>> did not use any WAL . The PR I proposed would recover from Driver fail-over
>> without using any WAL like feature because Blocks are already available in
>> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
>> received blocks.
>>
>> Now the question is , can I configure Tachyon as my Metadata Checkpoint
>> location ? I tried that , and Streaming application writes the
>> receivedBlockMeataData to Tachyon, but on driver failure, it can not
>> recover the received block meta data from Tachyon. I sometime see Zero size
>> files in Tachyon checkpoint location , and it can not recover past events .
>> I need to understand what is the issue of storing meta data in Tachyon .
>> That needs a different JIRA I guess.
>>
>> Let me know I am able to explain the current scenario around Spark
>> Streaming and Tachyon .
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 1:04 PM, N B  wrote:
>>
>>> Hi Dibyendu,
>>>
>>> I am not sure I understand completely. But are you suggesting that
>>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Hi,

 Recently I was working on a PR to use Tachyon as OFF_HEAP store for
 Spark Streaming and make sure Spark Streaming can recover from Driver
 failure and recover the blocks form Tachyon.

 The The Motivation for this PR is  :

 If Streaming application stores the blocks OFF_HEAP, it may not need
 any WAL like feature to recover from Driver failure. As long as the writing
 of blocks to Tachyon from Streaming receiver is durable, it should be
 recoverable from Tachyon directly on Driver failure.
 This can solve the issue of expensive WAL write and duplicating the
 blocks both in MEMORY and also WAL and also guarantee end to end
 No-Data-Loss channel using OFF_HEAP store.

 https://github.com/apache/spark/pull/8817

 This PR still under review . But having done various fail over testing
 in my environment , I see this PR worked perfectly fine without any data
 loss . Let see what TD and other have to say on this PR .

 Below is the configuration I used to test this PR ..


 Spark : 1.6 from Master
 Tachyon : 0.7.1

 SparkConfiguration Details :

 SparkConf conf = new SparkConf().setAppName("TestTachyon")
 .set("spark.streaming.unpersist", "true")
 .set("spark.local.dir", "/mnt1/spark/tincan")
 .set("tachyon.zookeeper.address","10.252.5.113:2182")
 .set("tachyon.usezookeeper","true")
 .set("spark.externalBlockStore.url", "tachyon-ft://
 ip-10-252-5-113.asskickery.us:19998")
 .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
 .set("spark.externalBlockStore.folderName","pearson")
 .set("spark.externalBlockStore.dirId", "subpub")

 .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

 JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
 1));

 String checkpointDirectory = "hdfs://
 10.252.5.113:9000/user/hadoop/spark/wal";

 jsc.checkpoint(checkpointDirectory);


 //I am using the My Receiver Based Consumer (
 

queup jobs in spark cluster

2015-09-26 Thread manish ranjan
Dear All,

I have a small spark cluster for academia purpose and would like it to be
open to accept jobs for set of friends
where all of us can submit and queue up jobs.

How is that possible ?  What is solution of this problem ? Any blog/sw/
link will be very helpful.

Thanks
~Manish


Re: queup jobs in spark cluster

2015-09-26 Thread Ted Yu
Related thread:
http://search-hadoop.com/m/q3RTt31EUSYGOj82

Please see:
https://spark.apache.org/docs/latest/security.html

FYI

On Sat, Sep 26, 2015 at 4:03 PM, manish ranjan 
wrote:

> Dear All,
>
> I have a small spark cluster for academia purpose and would like it to be
> open to accept jobs for set of friends
> where all of us can submit and queue up jobs.
>
> How is that possible ?  What is solution of this problem ? Any blog/sw/
> link will be very helpful.
>
> Thanks
> ~Manish
>
>
>


Re: GraphX create graph with multiple node attributes

2015-09-26 Thread Nick Peterson
Have you checked to make sure that your hashing function doesn't have any
collisions?  Node ids have to be unique; so, if you're getting repeated ids
out of your hasher, it could certainly lead to dropping of duplicate ids,
and therefore loss of vertices.

On Sat, Sep 26, 2015 at 10:37 AM JJ  wrote:

> Here is all of my code. My first post had a simplified version. As I post
> this, I realize one issue may be that when I convert my Ids to long (I
> define a pageHash function to convert string Ids to long), the nodeIds are
> no longer the same between the 'vertices' object and the 'edges' object. Do
> you think this is what is causing the issue?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-create-graph-with-multiple-node-attributes-tp24827p24832.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: What are best practices from Unit Testing Spark Code?

2015-09-26 Thread ehrlichja
Check out the spark-testing-base project.  I haven't tried it yet, looks good
though:

http://blog.cloudera.com/blog/2015/09/making-apache-spark-testing-easy-with-spark-testing-base/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-are-best-practices-from-Unit-Testing-Spark-Code-tp24821p24833.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird worker usage

2015-09-26 Thread Akhil Das
That means only a single receiver is doing all the work and hence the data
is local to your N1 machine and hence all tasks are executed there. Now to
get the data to N2, you need to do either a .repartition or set the
StorageLevel MEMORY*_2 where _2 enables the data replication and i guess
that will solve your problem.

Thanks
Best Regards

On Sun, Sep 27, 2015 at 12:50 AM, Akhil Das 
wrote:

> That means only
>
> Thanks
> Best Regards
>
> On Sun, Sep 27, 2015 at 12:07 AM, N B  wrote:
>
>> Hello,
>>
>> Does anyone have an insight into what could be the issue here?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 10:44 AM, N B  wrote:
>>
>>> Hi Akhil,
>>>
>>> I do have 25 partitions being created. I have set
>>> the spark.default.parallelism property to 25. Batch size is 30 seconds and
>>> block interval is 1200 ms which also gives us roughly 25 partitions from
>>> the input stream. I can see 25 partitions being created and used in the
>>> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
>>> free before being scheduled while N2 is sitting idle.
>>>
>>> The cluster configuration is:
>>>
>>> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
>>> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>>>
>>> for a grand total of 28 cores. But it still does most of the processing
>>> on N1 (divided among the 2 workers running) but almost completely
>>> disregarding N2 until its the final stage where data is being written out
>>> to Elasticsearch. I am not sure I understand the reason behind it not
>>> distributing more partitions to N2 to begin with and use it effectively.
>>> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
>>> send some of those partitions to N2 as well?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
>>> wrote:
>>>
 Parallel tasks totally depends on the # of partitions that you are
 having, if you are not receiving sufficient partitions (partitions > total
 # cores) then try to do a .repartition.

 Thanks
 Best Regards

 On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:

> Hello all,
>
> I have a Spark streaming application that reads from a Flume Stream,
> does quite a few maps/filters in addition to a few reduceByKeyAndWindow 
> and
> join operations before writing the analyzed output to ElasticSearch inside
> a foreachRDD()...
>
> I recently started to run this on a 2 node cluster (Standalone) with
> the driver program directly submitting to Spark master on the same host.
> The way I have divided the resources is as follows:
>
> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores
> each worker)
> N2: 2 spark workers (16 gb + 8 cores each worker).
>
> The application works just fine but it is underusing N2 completely. It
> seems to use N1 (note that both executors on N1 get used) for all the
> analytics but when it comes to writing to Elasticsearch, it does divide 
> the
> data around into all 4 executors which then write to ES on a separate 
> host.
>
> I am puzzled as to why the data is not being distributed evenly from
> the get go into all 4 executors and why would it only do so in the final
> step of the pipeline which seems counterproductive as well?
>
> CPU usage on N1 is near the peak while on N2 is < 10% of overall
> capacity.
>
> Any help in getting the resources more evenly utilized on N1 and N2 is
> welcome.
>
> Thanks in advance,
> Nikunj
>
>

>>>
>>
>


Re: Weird worker usage

2015-09-26 Thread Akhil Das
That means only

Thanks
Best Regards

On Sun, Sep 27, 2015 at 12:07 AM, N B  wrote:

> Hello,
>
> Does anyone have an insight into what could be the issue here?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 10:44 AM, N B  wrote:
>
>> Hi Akhil,
>>
>> I do have 25 partitions being created. I have set
>> the spark.default.parallelism property to 25. Batch size is 30 seconds and
>> block interval is 1200 ms which also gives us roughly 25 partitions from
>> the input stream. I can see 25 partitions being created and used in the
>> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
>> free before being scheduled while N2 is sitting idle.
>>
>> The cluster configuration is:
>>
>> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
>> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>>
>> for a grand total of 28 cores. But it still does most of the processing
>> on N1 (divided among the 2 workers running) but almost completely
>> disregarding N2 until its the final stage where data is being written out
>> to Elasticsearch. I am not sure I understand the reason behind it not
>> distributing more partitions to N2 to begin with and use it effectively.
>> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
>> send some of those partitions to N2 as well?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
>> wrote:
>>
>>> Parallel tasks totally depends on the # of partitions that you are
>>> having, if you are not receiving sufficient partitions (partitions > total
>>> # cores) then try to do a .repartition.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>>>
 Hello all,

 I have a Spark streaming application that reads from a Flume Stream,
 does quite a few maps/filters in addition to a few reduceByKeyAndWindow and
 join operations before writing the analyzed output to ElasticSearch inside
 a foreachRDD()...

 I recently started to run this on a 2 node cluster (Standalone) with
 the driver program directly submitting to Spark master on the same host.
 The way I have divided the resources is as follows:

 N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores
 each worker)
 N2: 2 spark workers (16 gb + 8 cores each worker).

 The application works just fine but it is underusing N2 completely. It
 seems to use N1 (note that both executors on N1 get used) for all the
 analytics but when it comes to writing to Elasticsearch, it does divide the
 data around into all 4 executors which then write to ES on a separate host.

 I am puzzled as to why the data is not being distributed evenly from
 the get go into all 4 executors and why would it only do so in the final
 step of the pipeline which seems counterproductive as well?

 CPU usage on N1 is near the peak while on N2 is < 10% of overall
 capacity.

 Any help in getting the resources more evenly utilized on N1 and N2 is
 welcome.

 Thanks in advance,
 Nikunj


>>>
>>
>