Re: Segmented fold count

2014-08-18 Thread Davies Liu
 import itertools
 l = [1,1,1,2,2,3,4,4,5,1]
 gs = itertools.groupby(l)
 map(lambda (n, it): (n, sum(1 for _ in it)), gs)
[(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)]

def groupCount(l):
   gs = itertools.groupby(l)
   return map(lambda (n, it): (n, sum(1 for _ in it)), gs)

If you have an RDD, you can use RDD.mapPartitions(groupCount).collect()

On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote:
 Can anyone assist with a scan of the following kind (Python preferred, but
 whatever..)? I'm looking for a kind of segmented fold count.

 Input: [1,1,1,2,2,3,4,4,5,1]
 Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)]
 or preferably two output columns:
 id: [1,2,3,4,5,1]
 count: [3,2,1,2,1,1]

 I can use a groupby/count, except for the fact that I just want to scan -
 not resort. Ideally this would be as low-level as possible and perform in a
 simple single scan. It also needs to retain the original sort order.

 Thoughts?




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Segmented fold count

2014-08-18 Thread Andrew Ash
What happens when a run of numbers is spread across a partition boundary?
 I think you might end up with two adjacent groups of the same value in
that situation.


On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote:

  import itertools
  l = [1,1,1,2,2,3,4,4,5,1]
  gs = itertools.groupby(l)
  map(lambda (n, it): (n, sum(1 for _ in it)), gs)
 [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)]

 def groupCount(l):
gs = itertools.groupby(l)
return map(lambda (n, it): (n, sum(1 for _ in it)), gs)

 If you have an RDD, you can use RDD.mapPartitions(groupCount).collect()

 On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote:
  Can anyone assist with a scan of the following kind (Python preferred,
 but
  whatever..)? I'm looking for a kind of segmented fold count.
 
  Input: [1,1,1,2,2,3,4,4,5,1]
  Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)]
  or preferably two output columns:
  id: [1,2,3,4,5,1]
  count: [3,2,1,2,1,1]
 
  I can use a groupby/count, except for the fact that I just want to scan -
  not resort. Ideally this would be as low-level as possible and perform
 in a
  simple single scan. It also needs to retain the original sort order.
 
  Thoughts?
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Segmented fold count

2014-08-18 Thread Davies Liu
On Sun, Aug 17, 2014 at 11:07 PM, Andrew Ash and...@andrewash.com wrote:
 What happens when a run of numbers is spread across a partition boundary?  I
 think you might end up with two adjacent groups of the same value in that
 situation.

Yes, need another scan to combine this continuous groups with same value.

 On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote:

  import itertools
  l = [1,1,1,2,2,3,4,4,5,1]
  gs = itertools.groupby(l)
  map(lambda (n, it): (n, sum(1 for _ in it)), gs)
 [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)]

 def groupCount(l):
gs = itertools.groupby(l)
return map(lambda (n, it): (n, sum(1 for _ in it)), gs)

 If you have an RDD, you can use RDD.mapPartitions(groupCount).collect()

 On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote:
  Can anyone assist with a scan of the following kind (Python preferred,
  but
  whatever..)? I'm looking for a kind of segmented fold count.
 
  Input: [1,1,1,2,2,3,4,4,5,1]
  Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)]
  or preferably two output columns:
  id: [1,2,3,4,5,1]
  count: [3,2,1,2,1,1]
 
  I can use a groupby/count, except for the fact that I just want to scan
  -
  not resort. Ideally this would be as low-level as possible and perform
  in a
  simple single scan. It also needs to retain the original sort order.
 
  Thoughts?
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: application as a service

2014-08-18 Thread Davies Liu
Another option is using Tachyon to cache the RDD, then the cache can
be shared by different applications.  See how to use Spark with
Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html

Davies

On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote:
 You can also look into using ooyala's job server at
 https://github.com/ooyala/spark-jobserver

 This already has a spary server built in that allows you to do what has
 already been explained above. Sounds like it should solve your problem.

 Enjoy!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/application-as-a-service-tp12253p12267.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OutOfMemory Error

2014-08-18 Thread Akhil Das
Hi Ghousia,

You can try the following:

1. Increase the heap size
https://spark.apache.org/docs/0.9.0/configuration.html
2. Increase the number of partitions
http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine
3. You could try persisting the RDD to use DISK_ONLY
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence



Thanks
Best Regards


On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com
wrote:

 Hi,

 I am trying to implement machine learning algorithms on Spark. I am working
 on a 3 node cluster, with each node having 5GB of memory. Whenever I am
 working with slightly more number of records, I end up with OutOfMemory
 Error. Problem is, even if number of records is slightly high, the
 intermediate result from a transformation is huge and this results in
 OutOfMemory Error. To overcome this, we are partitioning the data such that
 each partition has only a few records.

 Is there any better way to fix this issue. Some thing like spilling the
 intermediate data to local disk?

 Thanks,
 Ghousia.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread Cesar Arevalo
Hello:

I am trying to setup Spark to connect to a Hive table which is backed by
HBase, but I am running into the following NullPointerException:

scala val hiveCount = hiveContext.sql(select count(*) from
dataset_records).collect().head.getLong(0)
14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from
dataset_records
14/08/18 06:34:29 INFO ParseDriver: Parse Completed
14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default
tbl=dataset_records
14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table :
db=default tbl=dataset_records
14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with
curMem=0, maxMem=280248975
14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 156.5 KB, free 267.1 MB)
14/08/18 06:34:30 INFO SparkContext: Starting job: collect at
SparkPlan.scala:85
14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to
exception - job: 0
java.lang.NullPointerException
at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502)
at
org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)




This is happening from the master on spark, I am running hbase version
hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running
the spark shell:

bin/spark-shell --driver-class-path
/opt/hive/latest/lib/hive-hbase-handler-0.13.1.jar:/opt/hive/latest/lib/zookeeper-3.4.5.jar:/opt/spark-poc/lib_managed/jars/com.google.guava/guava/guava-14.0.1.jar:/opt/hbase/latest/lib/hbase-common-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/hbase-server-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/hbase-client-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/hbase-protocol-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/htrace-core-2.04.jar:/opt/hbase/latest/lib/netty-3.6.6.Final.jar:/opt/hbase/latest/lib/hbase-hadoop-compat-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-client/hbase-client-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-common/hbase-common-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-server/hbase-server-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-prefix-tree/hbase-prefix-tree-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-protocol/hbase-protocol-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/bundles/com.google.protobuf/protobuf-java/protobuf-java-2.5.0.jar:/opt/spark-poc/lib_managed/jars/org.cloudera.htrace/htrace-core/htrace-core-2.04.jar:/opt/spark/sql/hive/target/spark-hive_2.10-1.1.0-SNAPSHOT.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-common/hive-common-0.12.0.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-exec/hive-exec-0.12.0.jar:/opt/spark-poc/lib_managed/jars/org.apache.thrift/libthrift/libthrift-0.9.0.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-shims/hive-shims-0.12.0.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-metastore/hive-metastore-0.12.0.jar:/opt/spark/sql/catalyst/target/spark-catalyst_2.10-1.1.0-SNAPSHOT.jar:/opt/spark-poc/lib_managed/jars/org.antlr/antlr-runtime/antlr-runtime-3.4.jar:/opt/spark-poc/lib_managed/jars/org.apache.thrift/libfb303/libfb303-0.9.0.jar:/opt/spark-poc/lib_managed/jars/javax.jdo/jdo-api/jdo-api-3.0.1.jar:/opt/spark-poc/lib_managed/jars/org.datanucleus/datanucleus-api-jdo/datanucleus-api-jdo-3.2.1.jar:/opt/spark-poc/lib_managed/jars/org.datanucleus/datanucleus-core/datanucleus-core-3.2.2.jar:/opt/spark-poc/lib_managed/jars/org.datanucleus/datanucleus-rdbms/datanucleus-rdbms-3.2.1.jar:/opt/spark-poc/lib_managed/jars/org.apache.derby/derby/derby-10.4.2.0.jar:/opt/spark-poc/sbt/ivy/cache/org.apache.hive/hive-hbase-handler/jars/hive-hbase-handler-0.13.1.jar:/opt/spark-poc/lib_managed/jars/com.typesafe/scalalogging-slf4j_2.10/scalalogging-slf4j_2.10-1.0.1.jar:/opt/spark-poc/lib_managed/bundles/com.jolbox/bonecp/bonecp-0.7.1.RELEASE.jar:/opt/spark-poc/sbt/ivy/cache/com.datastax.cassandra/cassandra-driver-core/bundles/cassandra-driver-core-2.0.4.jar:/opt/spark-poc/lib_managed/jars/org.json/json/json-20090211.jar



Can anybody help me?

Best,
-- 
Cesar Arevalo
Software Engineer ❘ Zephyr Health
450 Mission Street, Suite #201 ❘ San Francisco, CA 94105
m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth
https://twitter.com/zephyrhealth
o: +1 415-529-7649 ❘ f: +1 

a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Henry Hung
Hi All,

I'm new to Spark and Scala, just recently using this language and love it, but 
there is a small coding problem when I want to convert my existing map reduce 
code from Java to Spark...

In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and 
override the setup(), map() and cleanup() methods.
But in the Spark, there is no a method called setup(), so I write the setup() 
code into map(), but it performs badly.
The reason is I create database connection in the setup() once and run() will 
execute SQL query, then cleanup() will close the connection.
Could someone tell me how to do it in Spark?

Best regards,
Henry Hung


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


Re: OutOfMemory Error

2014-08-18 Thread Ghousia
Thanks for the answer Akhil. We are right now getting rid of this issue by
increasing the number of partitions. And we are persisting RDDs to
DISK_ONLY. But the issue is with heavy computations within an RDD. It would
be better if we have the option of spilling the intermediate transformation
results to local disk (only in case if memory consumption is high)  . Do we
have any such option available with Spark? If increasing the partitions is
the only the way, then one might end up with OutOfMemory Errors, when
working with certain algorithms where intermediate result is huge.


On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Ghousia,

 You can try the following:

 1. Increase the heap size
 https://spark.apache.org/docs/0.9.0/configuration.html
 2. Increase the number of partitions
 http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine
 3. You could try persisting the RDD to use DISK_ONLY
 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence



 Thanks
 Best Regards


 On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com
 wrote:

 Hi,

 I am trying to implement machine learning algorithms on Spark. I am
 working
 on a 3 node cluster, with each node having 5GB of memory. Whenever I am
 working with slightly more number of records, I end up with OutOfMemory
 Error. Problem is, even if number of records is slightly high, the
 intermediate result from a transformation is huge and this results in
 OutOfMemory Error. To overcome this, we are partitioning the data such
 that
 each partition has only a few records.

 Is there any better way to fix this issue. Some thing like spilling the
 intermediate data to local disk?

 Thanks,
 Ghousia.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread Akhil Das
Looks like your hiveContext is null. Have a look at this documentation.
https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

Thanks
Best Regards


On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo ce...@zephyrhealthinc.com
wrote:

 Hello:

 I am trying to setup Spark to connect to a Hive table which is backed by
 HBase, but I am running into the following NullPointerException:

 scala val hiveCount = hiveContext.sql(select count(*) from
 dataset_records).collect().head.getLong(0)
 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from
 dataset_records
 14/08/18 06:34:29 INFO ParseDriver: Parse Completed
 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default
 tbl=dataset_records
 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table :
 db=default tbl=dataset_records
 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with
 curMem=0, maxMem=280248975
 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in
 memory (estimated size 156.5 KB, free 267.1 MB)
 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at
 SparkPlan.scala:85
 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to
 exception - job: 0
 java.lang.NullPointerException
 at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502)
  at
 org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
  at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
  at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)




 This is happening from the master on spark, I am running hbase version
 hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running
 the spark shell:

 bin/spark-shell --driver-class-path
 

RE: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Henry Hung
Hi All,

Please ignore my question, I found a way to implement it via old archive mails:

http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E

Best regards,
Henry

From: MA33 YTHung1
Sent: Monday, August 18, 2014 2:42 PM
To: user@spark.apache.org
Subject: a noob question for how to implement setup and cleanup in Spark map

Hi All,

I'm new to Spark and Scala, just recently using this language and love it, but 
there is a small coding problem when I want to convert my existing map reduce 
code from Java to Spark...

In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and 
override the setup(), map() and cleanup() methods.
But in the Spark, there is no a method called setup(), so I write the setup() 
code into map(), but it performs badly.
The reason is I create database connection in the setup() once and run() will 
execute SQL query, then cleanup() will close the connection.
Could someone tell me how to do it in Spark?

Best regards,
Henry Hung


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


Re: OutOfMemory Error

2014-08-18 Thread Akhil Das
I believe spark.shuffle.memoryFraction is the one you are looking for.

spark.shuffle.memoryFraction : Fraction of Java heap to use for aggregation
and cogroups during shuffles, if spark.shuffle.spill is true. At any given
time, the collective size of all in-memory maps used for shuffles is
bounded by this limit, beyond which the contents will begin to spill to
disk. If spills are often, consider increasing this value at the expense of
spark.storage.memoryFraction.

You can give it a try.


Thanks
Best Regards


On Mon, Aug 18, 2014 at 12:21 PM, Ghousia ghousia.ath...@gmail.com wrote:

 Thanks for the answer Akhil. We are right now getting rid of this issue by
 increasing the number of partitions. And we are persisting RDDs to
 DISK_ONLY. But the issue is with heavy computations within an RDD. It would
 be better if we have the option of spilling the intermediate transformation
 results to local disk (only in case if memory consumption is high)  . Do we
 have any such option available with Spark? If increasing the partitions is
 the only the way, then one might end up with OutOfMemory Errors, when
 working with certain algorithms where intermediate result is huge.


 On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Ghousia,

 You can try the following:

 1. Increase the heap size
 https://spark.apache.org/docs/0.9.0/configuration.html
 2. Increase the number of partitions
 http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine
 3. You could try persisting the RDD to use DISK_ONLY
 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence



 Thanks
 Best Regards


 On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com
 wrote:

 Hi,

 I am trying to implement machine learning algorithms on Spark. I am
 working
 on a 3 node cluster, with each node having 5GB of memory. Whenever I am
 working with slightly more number of records, I end up with OutOfMemory
 Error. Problem is, even if number of records is slightly high, the
 intermediate result from a transformation is huge and this results in
 OutOfMemory Error. To overcome this, we are partitioning the data such
 that
 each partition has only a few records.

 Is there any better way to fix this issue. Some thing like spilling the
 intermediate data to local disk?

 Thanks,
 Ghousia.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Akhil Das
You can create an RDD and then you can do a map or mapPartitions on that
where in the top you will create the database connection and all, then do
the operations and at the end close the connections.

Thanks
Best Regards


On Mon, Aug 18, 2014 at 12:34 PM, Henry Hung ythu...@winbond.com wrote:

  Hi All,



 Please ignore my question, I found a way to implement it via old archive
 mails:




 http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E



 Best regards,

 Henry



 *From:* MA33 YTHung1
 *Sent:* Monday, August 18, 2014 2:42 PM
 *To:* user@spark.apache.org
 *Subject:* a noob question for how to implement setup and cleanup in
 Spark map



 Hi All,



 I’m new to Spark and Scala, just recently using this language and love it,
 but there is a small coding problem when I want to convert my existing map
 reduce code from Java to Spark…



 In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper
 and override the setup(), map() and cleanup() methods.

 But in the Spark, there is no a method called setup(), so I write the
 setup() code into map(), but it performs badly.

 The reason is I create database connection in the setup() once and run()
 will execute SQL query, then cleanup() will close the connection.

 Could someone tell me how to do it in Spark?



 Best regards,

 Henry Hung


  --

 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.

 --
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.



Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Sean Owen
I think this was a more comprehensive answer recently. Tobias is right
that it is not quite that simple:
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E

On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote:
 Hi All,



 Please ignore my question, I found a way to implement it via old archive
 mails:



 http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E



 Best regards,

 Henry



 From: MA33 YTHung1
 Sent: Monday, August 18, 2014 2:42 PM
 To: user@spark.apache.org
 Subject: a noob question for how to implement setup and cleanup in Spark map



 Hi All,



 I’m new to Spark and Scala, just recently using this language and love it,
 but there is a small coding problem when I want to convert my existing map
 reduce code from Java to Spark…



 In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper
 and override the setup(), map() and cleanup() methods.

 But in the Spark, there is no a method called setup(), so I write the
 setup() code into map(), but it performs badly.

 The reason is I create database connection in the setup() once and run()
 will execute SQL query, then cleanup() will close the connection.

 Could someone tell me how to do it in Spark?



 Best regards,

 Henry Hung



 

 The privileged confidential information contained in this email is intended
 for use only by the addressees as indicated by the original sender of this
 email. If you are not the addressee indicated in this email or are not
 responsible for delivery of the email to such a person, please kindly reply
 to the sender indicating this fact and delete all copies of it from your
 computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.


 
 The privileged confidential information contained in this email is intended
 for use only by the addressees as indicated by the original sender of this
 email. If you are not the addressee indicated in this email or are not
 responsible for delivery of the email to such a person, please kindly reply
 to the sender indicating this fact and delete all copies of it from your
 computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread Cesar Arevalo
Nope, it is NOT null. Check this out:

scala hiveContext == null
res2: Boolean = false


And thanks for sending that link, but I had already looked at it. Any other
ideas?

I looked through some of the relevant Spark Hive code and I'm starting to
think this may be a bug.

-Cesar



On Mon, Aug 18, 2014 at 12:00 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Looks like your hiveContext is null. Have a look at this documentation.
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

 Thanks
 Best Regards


 On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo ce...@zephyrhealthinc.com
  wrote:

 Hello:

 I am trying to setup Spark to connect to a Hive table which is backed by
 HBase, but I am running into the following NullPointerException:

 scala val hiveCount = hiveContext.sql(select count(*) from
 dataset_records).collect().head.getLong(0)
 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from
 dataset_records
 14/08/18 06:34:29 INFO ParseDriver: Parse Completed
 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default
 tbl=dataset_records
 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table
 : db=default tbl=dataset_records
 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with
 curMem=0, maxMem=280248975
 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in
 memory (estimated size 156.5 KB, free 267.1 MB)
 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at
 SparkPlan.scala:85
 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to
 exception - job: 0
 java.lang.NullPointerException
 at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502)
  at
 org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
  at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
  at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)




 This is happening from the master on spark, I am running hbase version
 hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running
 the spark shell:

 bin/spark-shell --driver-class-path
 

RE: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Henry Hung
I slightly modify the code to use while(partitions.hasNext) { } instead of 
partitions.map(func)
I suppose this can eliminate the uncertainty from lazy execution.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: Monday, August 18, 2014 3:10 PM
To: MA33 YTHung1
Cc: user@spark.apache.org
Subject: Re: a noob question for how to implement setup and cleanup in Spark map

I think this was a more comprehensive answer recently. Tobias is right that it 
is not quite that simple:
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E

On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote:
 Hi All,



 Please ignore my question, I found a way to implement it via old
 archive
 mails:



 http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF
 _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E



 Best regards,

 Henry



 From: MA33 YTHung1
 Sent: Monday, August 18, 2014 2:42 PM
 To: user@spark.apache.org
 Subject: a noob question for how to implement setup and cleanup in
 Spark map



 Hi All,



 I’m new to Spark and Scala, just recently using this language and love
 it, but there is a small coding problem when I want to convert my
 existing map reduce code from Java to Spark…



 In Java, I create a class by extending
 org.apache.hadoop.mapreduce.Mapper
 and override the setup(), map() and cleanup() methods.

 But in the Spark, there is no a method called setup(), so I write the
 setup() code into map(), but it performs badly.

 The reason is I create database connection in the setup() once and
 run() will execute SQL query, then cleanup() will close the connection.

 Could someone tell me how to do it in Spark?



 Best regards,

 Henry Hung



 

 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original
 sender of this email. If you are not the addressee indicated in this
 email or are not responsible for delivery of the email to such a
 person, please kindly reply to the sender indicating this fact and
 delete all copies of it from your computer and network server
 immediately. Your cooperation is highly appreciated. It is advised
 that any unauthorized use of confidential information of Winbond is
 strictly prohibited; and any information in this email irrelevant to
 the official business of Winbond shall be deemed as neither given nor 
 endorsed by Winbond.


 
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original
 sender of this email. If you are not the addressee indicated in this
 email or are not responsible for delivery of the email to such a
 person, please kindly reply to the sender indicating this fact and
 delete all copies of it from your computer and network server
 immediately. Your cooperation is highly appreciated. It is advised
 that any unauthorized use of confidential information of Winbond is
 strictly prohibited; and any information in this email irrelevant to
 the official business of Winbond shall be deemed as neither given nor 
 endorsed by Winbond.

The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


Re: Segmented fold count

2014-08-18 Thread fil
Thanks for the reply!

def groupCount(l):
gs = itertools.groupby(l)
return map(lambda (n, it): (n, sum(1 for _ in it)), gs)

 If you have an RDD, you can use RDD.mapPartitions(groupCount).collect()


Yes, I am interested in RDD - not pure Python :)

I am new to Spark, can you explain:

- Python functions like groupCount; these get reflected from their Python
AST and converted into a Spark DAG? Presumably if I try and do something
non-convertible this transformation process will throw an error? In other
words this runs in the JVM.
- I had considered that partitions were batches of distributable work,
and generally large. Presumably the above is OK with small groups (eg.
average size  10) - this won't kill performance?

On Mon, Aug 18, 2014 at 4:20 PM, Davies Liu dav...@databricks.com wrote:

 On Sun, Aug 17, 2014 at 11:07 PM, Andrew Ash and...@andrewash.com wrote:
  What happens when a run of numbers is spread across a partition
 boundary?  I
  think you might end up with two adjacent groups of the same value in that
  situation.

 Yes, need another scan to combine this continuous groups with same value.


Yep - this will happen frequently. So by this you mean scanning the
resulting mapPartitions() results? Presumably I could eliminate adjacent
duplicates - or specifically look for duplicates at the end/start of
different batches (what is the Spark term for this) from different nodes
in the cluster. What's the Spark'iest way to do this efficiently? :)

Regards, Fil.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread Akhil Das
Then definitely its a jar conflict. Can you try removing this jar from the
class path /opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-exec/
hive-exec-0.12.0.jar

Thanks
Best Regards


On Mon, Aug 18, 2014 at 12:45 PM, Cesar Arevalo ce...@zephyrhealthinc.com
wrote:

 Nope, it is NOT null. Check this out:

 scala hiveContext == null
 res2: Boolean = false


 And thanks for sending that link, but I had already looked at it. Any
 other ideas?

 I looked through some of the relevant Spark Hive code and I'm starting to
 think this may be a bug.

 -Cesar



 On Mon, Aug 18, 2014 at 12:00 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Looks like your hiveContext is null. Have a look at this documentation.
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

 Thanks
 Best Regards


 On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo 
 ce...@zephyrhealthinc.com wrote:

 Hello:

 I am trying to setup Spark to connect to a Hive table which is backed by
 HBase, but I am running into the following NullPointerException:

 scala val hiveCount = hiveContext.sql(select count(*) from
 dataset_records).collect().head.getLong(0)
 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*)
 from dataset_records
 14/08/18 06:34:29 INFO ParseDriver: Parse Completed
 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default
 tbl=dataset_records
 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table
 : db=default tbl=dataset_records
 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with
 curMem=0, maxMem=280248975
 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values
 in memory (estimated size 156.5 KB, free 267.1 MB)
 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at
 SparkPlan.scala:85
 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to
 exception - job: 0
 java.lang.NullPointerException
 at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502)
  at
 org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
  at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
  at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)




 This is happening from the master on spark, I am running hbase version
 hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running
 the spark shell:

 bin/spark-shell --driver-class-path
 

Re: Spark: why need a masterLock when sending heartbeat to master

2014-08-18 Thread Victor Sheng
Thanks, I got it !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-why-need-a-masterLock-when-sending-heartbeat-to-master-tp12256p12297.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: s3:// sequence file startup time

2014-08-18 Thread Cheng Lian
Maybe irrelevant, but this resembles a lot the S3 Parquet file issue we've
met before. It takes a dozen minutes to read the metadata because the
ParquetInputFormat tries to call geFileStatus for all part-files
sequentially.

Just checked SequenceFileInputFormat, and found that a MapFile may share
similar issue.


On Mon, Aug 18, 2014 at 5:26 AM, Aaron Davidson ilike...@gmail.com wrote:

 The driver must initially compute the partitions and their preferred
 locations for each part of the file, which results in a serial
 getFileBlockLocations() on each part. However, I would expect this to take
 several seconds, not minutes, to perform on 1000 parts. Is your driver
 inside or outside of AWS? There is an order of magnitude difference in the
 latency of S3 requests if you're running outside of AWS.

 We have also experienced an excessive slowdown in the metadata lookups
 using Hadoop 2 versus Hadoop 1, likely due to the differing jets3t library
 versions. If you're using Hadoop 2, you might try downgrading to Hadoop
 1.2.1 and seeing if the startup time decreases.


 On Sat, Aug 16, 2014 at 6:46 PM, kmatzen kmat...@gmail.com wrote:

 I have some RDD's stored as s3://-backed sequence files sharded into 1000
 parts.  The startup time is pretty long (~10's of minutes).  It's
 communicating with S3, but I don't know what it's doing.  Is it just
 fetching the metadata from S3 for each part?  Is there a way to pipeline
 this with the computation?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/s3-sequence-file-startup-time-tp12242.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Re: application as a service

2014-08-18 Thread Zhanfeng Huo
That helps a lot.

Thanks.



Zhanfeng Huo
 
From: Davies Liu
Date: 2014-08-18 14:31
To: ryaminal
CC: u...@spark.incubator.apache.org
Subject: Re: application as a service
Another option is using Tachyon to cache the RDD, then the cache can
be shared by different applications.  See how to use Spark with
Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html
 
Davies
 
On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote:
 You can also look into using ooyala's job server at
 https://github.com/ooyala/spark-jobserver

 This already has a spary server built in that allows you to do what has
 already been explained above. Sounds like it should solve your problem.

 Enjoy!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/application-as-a-service-tp12253p12267.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
 


Working with many RDDs in parallel?

2014-08-18 Thread David Tinker
Hi All.

I need to create a lot of RDDs starting from a set of roots and count the
rows in each. Something like this:

final JavaSparkContext sc = new JavaSparkContext(conf);
ListString roots = ...
MapString, Object res = sc.parallelize(roots).mapToPair(new
PairFunctionString, String, Long(){
public Tuple2String, Long call(String root) throws Exception {
... create RDD based on root from sc somehow ...
return new Tuple2String, Long(root, rdd.count())
}
}).countByKey()

This fails with a message about JavaSparkContext not being serializable.

Is there a way to get at the content inside of the map function or should I
be doing something else entirely?

Thanks
David


Re: Working with many RDDs in parallel?

2014-08-18 Thread Sean Owen
You won't be able to use RDDs inside of RDD operation. I imagine your
immediate problem is that the code you've elided references 'sc' and
that gets referenced by the PairFunction and serialized, but it can't
be.

If you want to play it this way, parallelize across roots in Java.
That is just use an ExecutorService to launch a bunch of operations on
RDDs in parallel. There's no reason you can't do that, although I
suppose there are upper limits as to what makes sense on your cluster.
1000 RDD count()s at once isn't a good idea for example.

It may be the case that you don't really need a bunch of RDDs at all,
but can operate on an RDD of pairs of Strings (roots) and
something-elses, all at once.


On Mon, Aug 18, 2014 at 2:31 PM, David Tinker david.tin...@gmail.com wrote:
 Hi All.

 I need to create a lot of RDDs starting from a set of roots and count the
 rows in each. Something like this:

 final JavaSparkContext sc = new JavaSparkContext(conf);
 ListString roots = ...
 MapString, Object res = sc.parallelize(roots).mapToPair(new
 PairFunctionString, String, Long(){
 public Tuple2String, Long call(String root) throws Exception {
 ... create RDD based on root from sc somehow ...
 return new Tuple2String, Long(root, rdd.count())
 }
 }).countByKey()

 This fails with a message about JavaSparkContext not being serializable.

 Is there a way to get at the content inside of the map function or should I
 be doing something else entirely?

 Thanks
 David

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark kryo serilizable exception

2014-08-18 Thread adu
hi all,
In RDD map , i invoke an object that is *Serialized* by java standard ,
and exception ::



com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 13
at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
at com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Spark Streaming Data Sharing

2014-08-18 Thread Levi Bowman
Based on my understanding something like this doesn't seem to be possible out 
of the box, but I thought I would write it up anyway in case someone has any 
ideas.

We have conceptually one high volume input stream, each streaming job is either 
interested in a subset of the stream or the entire stream.  We would like to 
get to the point where we could be running a large number of streaming jobs 
concurrently across a cluster.  It does not seem that putting a buffer like 
Kafka in between the source and the streaming jobs would be a sustainable route 
as all jobs would be consuming the whole stream and the majority of them would 
only be interested in a small subset of the available data.  As we don't know 
exactly what data each job would be interested in up front it would be 
difficult to separate/partition Kafka topics up front.

What I think we want is a way to have one streaming job whose output is passed 
to n other streaming jobs.  Has anyone though about implementing something like 
this?   I don't see a way to have the Receiver for one streaming job be the 
output of another streaming job.





This e-mail, including accompanying communications and attachments, is strictly 
confidential and only for the intended recipient. Any retention, use or 
disclosure not expressly authorised by Markit is prohibited. This email is 
subject to all waivers and other terms at the following link: 
http://www.markit.com/en/about/legal/email-disclaimer.page

Please visit http://www.markit.com/en/about/contact/contact-us.page? for 
contact information on our offices worldwide.


Re: Question regarding spark data partition and coalesce. Need info on my use case.

2014-08-18 Thread abhiguruvayya
Hello Mayur,

#3 in the new RangePartitioner(*3*, partitionedFile); is also a hard coded
value for the number of partitions. Do you find a way where i can avoid
that. And including the cluster size, partitions depends on the input data
size also. Correct me if i am wrong.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-regarding-spark-data-partition-and-coalesce-Need-info-on-my-use-case-tp12214p12311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark kryo serilizable exception

2014-08-18 Thread Sameer Tilak
Hi,I was able to set this parameter in my application to resolve this issue:
set(spark.kryoserializer.buffer.mb, 256)
Please let me know if this helps.

Date: Mon, 18 Aug 2014 21:50:02 +0800
From: dujinh...@hzduozhun.com
To: user@spark.apache.org
Subject: spark  kryo serilizable  exception


  

  
  
hi all,

In RDD map , i invoke an object that is

Serialized by java standard ,  and exception ::





 

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available:
0, required: 13

at com.esotericsoftware.kryo.io.Output.require(Output.java:138)

at
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)

at
com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)

at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)

at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)

at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)

at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:138)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)
  

Re: NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread Cesar Arevalo
I removed the JAR that you suggested but now I get another error when I try
to create the HiveContext. Here is the error:

scala val hiveContext = new HiveContext(sc)
error: bad symbolic reference. A signature in HiveContext.class refers to
term ql
in package org.apache.hadoop.hive which is not available.
It may be completely missing from the current classpath,
ommitted more stacktrace for readability...


Best,
-Cesar


On Mon, Aug 18, 2014 at 12:47 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Then definitely its a jar conflict. Can you try removing this jar from the
 class path /opt/spark-poc/lib_managed/jars/org.
 spark-project.hive/hive-exec/hive-exec-0.12.0.jar

 Thanks
 Best Regards


 On Mon, Aug 18, 2014 at 12:45 PM, Cesar Arevalo ce...@zephyrhealthinc.com
  wrote:

 Nope, it is NOT null. Check this out:

 scala hiveContext == null
 res2: Boolean = false


 And thanks for sending that link, but I had already looked at it. Any
 other ideas?

 I looked through some of the relevant Spark Hive code and I'm starting to
 think this may be a bug.

 -Cesar



 On Mon, Aug 18, 2014 at 12:00 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Looks like your hiveContext is null. Have a look at this documentation.
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

 Thanks
 Best Regards


 On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo 
 ce...@zephyrhealthinc.com wrote:

 Hello:

 I am trying to setup Spark to connect to a Hive table which is backed
 by HBase, but I am running into the following NullPointerException:

 scala val hiveCount = hiveContext.sql(select count(*) from
 dataset_records).collect().head.getLong(0)
 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*)
 from dataset_records
 14/08/18 06:34:29 INFO ParseDriver: Parse Completed
 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default
 tbl=dataset_records
 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table
 : db=default tbl=dataset_records
 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with
 curMem=0, maxMem=280248975
 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values
 in memory (estimated size 156.5 KB, free 267.1 MB)
 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at
 SparkPlan.scala:85
 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to
 exception - job: 0
 java.lang.NullPointerException
 at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502)
  at
 org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
  at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
  at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)




 This is happening from the master on spark, I am running hbase version
 hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running
 the spark shell:

 bin/spark-shell --driver-class-path
 

Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
I'm curious to see that if you declare broadcasted wrapper as a var, and
overwrite it in the driver program, the modification can have stable impact
on all transformations/actions defined BEFORE the overwrite but was executed
lazily AFTER the overwrite:

   val a = sc.parallelize(1 to 10)

var broadcasted = sc.broadcast(broad)

val b = a.map(_ + broadcasted.value)
//  b.persist()
for (line - b.collect()) {  print(line)  }

println(\n===)
broadcasted = sc.broadcast(cast)

for (line - b.collect()) {  print(line)  }

the result is:

1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad
===
1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast

Of course, if you persist b before overwriting it will still get the
non-surprising result (both are 10broad... because they are persisted). This
can be useful sometimes but may cause confusion at other times (people can
no longer add persist at will just for backup because it may change the
result).

So far I've found no documentation supporting this feature. So can some one
confirm that its a feature craftly designed?

Yours Peng 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Merging complicated small matrices to one big matrix

2014-08-18 Thread Davies Liu
rdd.flatMap(lambda x:x) maybe could solve your problem, it will
convert an RDD from

[[[1,2,3],[4,5,6]],[[7,8,9,],[10,11,12]]]

into:

[[1,2,3], [4,5,6], [7,8,9,], [10,11,12]]

On Mon, Aug 18, 2014 at 2:42 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 I have an rdd in pyspark which looks like follows:
 It has two sub matrices..(array,array)
 [
   array([[-13.00771575,   0.2740844 ,   0.9752694 ,   0.67465999,
  -1.45741537,   0.546775  ,   0.7900841 ,  -0.59473707,
  -1.11752044,   0.61564356],
[ -0.,  12.20115746,  -0.49016935,  -0.9236129 ,
  -1.1693633 ,  -0.39135626,   1.10752864,   0.16920118,
  -1.098806  ,   1.10045185],
[  0.,   0., -11.26425992,   0.56309152,
   0.44872832,   0.69722768,   0.84200281,   0.89537327,
   0.10460865,  -0.62938474],
[ -0.,  -0.,   0.,  13.1112119 ,
   0.39986223,  -1.22218936,   0.72315955,   0.12208597,
  -0.6258082 ,  -0.91077504],
[  0.,  -0.,   0.,   0.,
 -11.04483145,  -1.71948244,  -0.73239228,  -0.19651712,
  -0.97931725,  -0.43263423],
[  0.,   0.,   0.,  -0.,
   0., -12.1996715 ,  -0.05580816,   0.20517336,
   0.53584998,   1.3370874 ],
[  0.,  -0.,  -0.,   0.,
   0.,   0.,  12.32603631,   0.47498103,
  -0.65901705,  -0.85713277],
[  0.,   0.,   0.,  -0.,
   0.,  -0.,   0.,  11.90030251,
   1.73036795,   0.70588443],
[ -0.,  -0.,   0.,   0.,
  -0.,  -0.,   0.,  -0.,
  13.00493769,   1.37753403],
[  0.,  -0.,   0.,   0.,
   0.,  -0.,   0.,   0.,
  -0., -10.89006283]]),

 array([[-12.43375184,   1.07703619,  -0.47818221,   1.65919732,
   0.96307502,  -1.6322447 ,  -1.09409297,  -0.64849112,
  -1.09349557,  -0.68706834],
[  0., -11.93616969,   0.08784614,   1.76677411,
  -0.0585134 ,  -0.70979485,   0.29757848,   1.19562173,
  -1.54176475,   1.71500862],
[  0.,  -0., -12.42060272,   2.17060365,
  -1.3212244 ,   0.73742297,   0.50410937,  -0.35278129,
  -0.40513689,  -0.81222302],
[ -0.,   0.,   0., -11.93419851,
  -1.15614929,   1.04085489,   0.69986351,  -1.3615322 ,
   0.43467842,  -1.33041858],
[ -0.,  -0.,   0.,   0.,
  11.22907137,  -0.12925322,   0.46293906,  -2.01577912,
  -2.26566926,  -0.17750339],
[  0.,   0.,   0.,   0.,
  -0., -12.0705513 ,  -0.19432359,   0.41226088,
   0.79436699,  -0.61288711],
[  0.,  -0.,   0.,   0.,
  -0.,  -0.,  11.99770753,  -1.24277228,
   1.32240282,   1.5140609 ],
[ -0.,   0.,  -0.,  -0.,
   0.,  -0.,   0., -13.07008472,
   0.52031563,  -1.56247391],
[  0.,  -0.,   0.,   0.,
  -0.,  -0.,  -0.,  -0.,
  13.16585107,   0.57741265],
[  0.,   0.,  -0.,  -0.,
   0.,   0.,   0.,  -0.,
  -0., -13.53719704]])
 ]

 So, basically I  have sub matrices like [sub_matrix_1, sub_matrix_2 ]
 (the above has just two matrices..

 I want to combine in one big matrix column wise

 [ sub_matrix_1
  sub_matrix_2
 
 ]
 Any suggestions?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Does HiveContext support Parquet?

2014-08-18 Thread lyc
I followed your instructions to try to load data as parquet format through
hiveContext but failed. Do you happen to know my uncorrectness in the
following steps?

The steps I am following is like:
1. download parquet-hive-bundle-1.5.0.jar
2. revise hive-site.xml including this:

property
  namehive.jar.directory/name
  value/home/hduser/hive/lib/parquet-hive-bundle-1.5.0.jar/value
  description
This is the location hive in tez mode will look for to find a site wide
installed hive instance. If not set, the directory under
hive.user.install.directory
corresponding to current user name will be used.
  /description
/property

3. copy hive-site.xml to all nodes.
4. start spark-shell, then try to create table:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
 hql(create table part (P_PARTKEY INT, P_NAME STRING, P_MFGR STRING,
P_BRAND STRING, P_TYPE STRING, P_SIZE INT, P_CONTAINER STRING, P_RETAILPRICE
DOUBLE, P_COMMENT STRING) STORED AS PARQUET)

Then I got this error:
14/08/18 19:09:00 ERROR Driver: FAILED: SemanticException Unrecognized file
format in STORED AS clause: PARQUET
org.apache.hadoop.hive.ql.parse.SemanticException: Unrecognized file format
in STORED AS clause: PARQUET
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.handleGenericFileFormat(BaseSemanticAnalyzer.java:569)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:8968)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8313)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:284)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
at
org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:247)
at
org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85)
at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90)
at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18)
at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23)
at $line44.$read$$iwC$$iwC$$iwC$$iwC.init(console:25)
at $line44.$read$$iwC$$iwC$$iwC.init(console:27)
at $line44.$read$$iwC$$iwC.init(console:29)
at $line44.$read$$iwC.init(console:31)
at $line44.$read.init(console:33)
at $line44.$read$.init(console:37)
at $line44.$read$.clinit(console)
at $line44.$eval$.init(console:7)
at $line44.$eval$.clinit(console)
at $line44.$eval.$print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

java.nio.channels.CancelledKeyException in Graphx Connected Components

2014-08-18 Thread Jeffrey Picard
Hey all,

I’m trying to run connected components in graphx on about 400GB of data on 50 
m3.xlarge nodes on emr. I keep getting java.nio.channels.CancelledKeyException 
when it gets to mapPartitions at VertexRDD.scala:347”. I haven’t been able to 
find much about this online, and nothing that seems relevant to my situation. 
I’m using spark built from the master repo, because I need the custom storage 
level feature in graphx.

My configuration:

.set(spark.executor.memory, 13g)
.set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer)

.set(spark.kryo.registrator,org.apache.spark.graphx.GraphKryoRegistrator)

.set(spark.executor.extraJavaOptions,-XX:-UseGCOverheadLimit)
.set(spark.akka.frameSize, 128)
.set(spark.storage.memoryFraction, 0.2)
.set(spark.shuffle.memoryFraction, 0.7)
.set(spark.akka.timeout,600”)

The code I’m running:

val graph = GraphLoader.edgeListFile(sc, “/foo/bar, minEdgePartitions=2000, 
edgeStorageLevel=StorageLevel.MEMORY_AND_DISK_SER, 
vertexStorageLevel=StorageLevel.MEMORY_AND_DISK_SER)

val cc = graph.connectedComponents().vertices

Stack trace:

14/08/18 18:50:07 INFO network.ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@501c1504
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
at java.nio.channels.SelectionKey.isAcceptable(SelectionKey.java:360)
at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:372)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/08/18 18:50:07 INFO cluster.SparkDeploySchedulerBackend: Executor 48 
disconnected, so removing it
14/08/18 18:50:07 ERROR network.SendingConnection: Exception while reading 
SendingConnection to ConnectionManagerId(ip-10-136-91-34.ec2.internal,51095)
java.nio.channels.ClosedChannelException
at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/08/18 18:50:07 INFO storage.BlockManagerInfo: Added rdd_16_161 in memory on 
ip-10-150-84-111.ec2.internal:58219 (size: 78.4 MB, free: 716.5 MB)
14/08/18 18:50:07 ERROR scheduler.TaskSchedulerImpl: Lost executor 48 on 
ip-10-136-91-34.ec2.internal: remote Akka client disassociated

Log from the worker node in spark/logs/:

14/08/18 18:40:52 INFO worker.ExecutorRunner: Launch command: 
/usr/lib/jvm/java-7-oracle/bin/java -cp 
/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/:/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/::/home/hadoop/spark/conf:/home/hadoop/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.3.jar:/home/hadoop/conf/
 -XX:MaxPermSize=128m -Dspark.akka.timeout=600 -Dspark.akka.frameSize=128 
-Dspark.driver.port=50456 -XX:-UseGCOverheadLimit -Xms13312M -Xmx13312M 
org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://spark@ip-10-171-49-153.ec2.internal:50456/user/CoarseGrainedScheduler
 48 ip-10-136-91-34.ec2.internal 4 
akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942/user/Worker 
app-20140818184051-
240.261: [GC [PSYoungGen: 132096K-18338K(153600K)] 132096K-18346K(503296K), 
0.0285710 secs] [Times: user=0.04 sys=0.02, real=0.03 secs]
14/08/18 18:50:07 INFO worker.Worker: Executor app-20140818184051-/48 
finished with state EXITED message Command exited with code 137 exitStatus 137
14/08/18 18:50:07 INFO worker.Worker: Asked to launch executor 
app-20140818184051-/50 for Spark CC
14/08/18 18:50:07 INFO actor.LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from 
Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.136.91.34%3A47436-2#201455087]
 was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] - 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error 
[Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [
akka.remote.EndpointAssociationException: Association 

Extracting unique elements of an ArrayBuffer

2014-08-18 Thread SK
Hi,

I have a piece of code in which the result of a  groupByKey operation is as
follows:

(2013-04, ArrayBuffer(s1, s2, s3, s1, s2, s4))

The first element is a String value representing a date and the ArrayBuffer
consists of (non-unique) strings. I want to extract the unique elements of
the ArrayBuffer. So I am expecting the result to be:

(2013-04, ArrayBuffer(s1, s2, s3, s4))

I tried the following:
  .groupByKey
  .map(g = (g._1, g,_2.distinct)

But I get the following  runtime error: 
value distinct is not a member of Iterable[String]
[error].map(g= (g._1, g._2.distinct))

I also  tried g._2.distinct(), but got the same error. 


I looked at the Scala ArrayBuffer documentation and it supports distinct()
and count() operations.  I am using Spark 1.0.1 and scala 2.10.4.  I would
like to know how to extract the unique elements of the ArrayBuffer above.

thanks








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-unique-elements-of-an-ArrayBuffer-tp12320.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Data Sharing

2014-08-18 Thread Ruchir Jha
The Spark Job that has the main DStream, could have another DStream that is
listening for stream subscription requests. So when a subscription is
received, you could do a filter/forEach on the main DStream and respond to
that one request. So you're basically creating a stream server that is
capable of accepting filter requests.


On Mon, Aug 18, 2014 at 10:30 AM, Levi Bowman levi.bow...@markit.com
wrote:

  Based on my understanding something like this doesn’t seem to be
 possible out of the box, but I thought I would write it up anyway in case
 someone has any ideas.



 We have conceptually one high volume input stream, each streaming job is
 either interested in a subset of the stream or the entire stream.  We would
 like to get to the point where we could be running a large number of
 streaming jobs concurrently across a cluster.  It does not seem that
 putting a buffer like Kafka in between the source and the streaming jobs
 would be a sustainable route as all jobs would be consuming the whole
 stream and the majority of them would only be interested in a small subset
 of the available data.  As we don’t know exactly what data each job would
 be interested in up front it would be difficult to separate/partition Kafka
 topics up front.



 What I think we want is a way to have one streaming job whose output is
 passed to n other streaming jobs.  Has anyone though about implementing
 something like this?   I don’t see a way to have the Receiver for one
 streaming job be the output of another streaming job.





 --

 This e-mail, including accompanying communications and attachments, is
 strictly confidential and only for the intended recipient. Any retention,
 use or disclosure not expressly authorised by Markit is prohibited. This
 email is subject to all waivers and other terms at the following link:
 http://www.markit.com/en/about/legal/email-disclaimer.page

 Please visit http://www.markit.com/en/about/contact/contact-us.page? for
 contact information on our offices worldwide.



Re: Writing to RabbitMQ

2014-08-18 Thread jschindler
Well, it looks like I can use the .repartition(1) method to stuff everything
in one partition so that gets rid of the duplicate messages I send to
RabbitMQ but that seems like a bad idea perhaps.  Wouldn't that hurt
scalability?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark - reading hfds files every 5 minutes

2014-08-18 Thread salemi
Hi,

Mine data source stores the incoming data every 10 second to hdfs. The
naming convention save-timestamp.csv (see below)

drwxr-xr-x  ali supergroup  0 B 0   0 B save-1408396065000.csv
drwxr-xr-x  ali supergroup  0 B 0   0 B save-140839607.csv
drwxr-xr-x  ali supergroup  0 B 0   0 B save-1408396075000.csv
drwxr-xr-x  ali supergroup  0 B 0   0 B save-140839608.csv

I would like to periodically (every 5min) read the files and process them.
is there a good example out there how to implement this? How do I know what
part of the data I have already processed?

Thanks,
Ali



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-reading-hfds-files-every-5-minutes-tp12325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread Zhan Zhang
Looks like hbaseTableName is null, probably caused by incorrect configuration.


String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
setHTable(new HTable(HBaseConfiguration.create(jobConf), 
Bytes.toBytes(hbaseTableName)));

Here is the definition.

  public static final String HBASE_TABLE_NAME = hbase.table.name”;

Thanks.

Zhan Zhang


On Aug 17, 2014, at 11:39 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote:

 HadoopRDD


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


setCallSite for API backtraces not showing up in logs?

2014-08-18 Thread John Salvatier
What's the correct way to use setCallSite to get the change to show up in
the spark logs?

I have something like

class RichRDD (rdd : RDD[MyThing]) {
   def mySpecialOperation() {
   rdd.context.setCallSite(bubbles and candy!)
   rdd.map()
   val result = rdd.groupBy()
   rdd.context.clearCallSite()
   result
}
}


But when I use .mySpecialOperation bubbles and candy! doesn't seem to
show up anywhere in the logs. Is this not the right way to use
.setCallSite?


Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded

2014-08-18 Thread Ankur Dave
On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote:

 I am testing our application(similar to personalised page rank using
 Pregel, and note that each vertex property will need pretty much more space
 to store after new iteration)

[...]

But when we ran it on larger graph(e.g. LiveJouranl), it always end at the
 error GC overhead limit exceeded, even the partitions number is increased
 to 48 from 8.


If the graph (including vertex properties) is too large to fit in memory,
you might try allowing it to spill to disk. When constructing the graph,
you can set vertexStorageLevel and edgeStorageLevel to
StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish.

Ankur http://www.ankurdave.com/


Re: Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Zhan Zhang
I think the behavior is by designed. Because if b is not persisted, and in each 
call b.collect, broadcasted has point to a new broadcasted variable, serialized 
by driver, and fetched by executors.

If you do persist, you don’t expect the RDD get changed due to new broadcasted 
variable.

Thanks.

Zhan Zhang 


On Aug 18, 2014, at 11:26 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm curious to see that if you declare broadcasted wrapper as a var, and
 overwrite it in the driver program, the modification can have stable impact
 on all transformations/actions defined BEFORE the overwrite but was executed
 lazily AFTER the overwrite:
 
   val a = sc.parallelize(1 to 10)
 
var broadcasted = sc.broadcast(broad)
 
val b = a.map(_ + broadcasted.value)
 //  b.persist()
for (line - b.collect()) {  print(line)  }
 
println(\n===)
broadcasted = sc.broadcast(cast)
 
for (line - b.collect()) {  print(line)  }
 
 the result is:
 
 1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad
 ===
 1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast
 
 Of course, if you persist b before overwriting it will still get the
 non-surprising result (both are 10broad... because they are persisted). This
 can be useful sometimes but may cause confusion at other times (people can
 no longer add persist at will just for backup because it may change the
 result).
 
 So far I've found no documentation supporting this feature. So can some one
 confirm that its a feature craftly designed?
 
 Yours Peng 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to use Spark Streaming from an HTTP api?

2014-08-18 Thread bumble123
I want to send an HTTP request (specifically to OpenTSDB) to get data. I've
been looking at the StreamingContext api and don't seem to see any methods
that can connect to this. Has anyone tried connecting Spark Streaming to a
server via HTTP requests before? How have you done it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullPointerException when connecting from Spark to a Hive table backed by HBase

2014-08-18 Thread cesararevalo
Thanks, Zhan for the follow up.

But, do you know how I am supposed to set that table name on the jobConf? I
don't have access to that object from my client driver?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-connecting-from-Spark-to-a-Hive-table-backed-by-HBase-tp12284p12331.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-submit with HA YARN

2014-08-18 Thread Matt Narrell
Hello,

I have an HA enabled YARN cluster with two resource mangers.  When submitting 
jobs via “spark-submit —master yarn-cluster”.  It appears that the driver is 
looking explicitly for the yarn.resourcemanager.address” property rather than 
round robin-ing through the resource managers via the 
“yarn.client.failover-proxy-provider” property set to 
“org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider”

If I explicitly set the “yarn.resourcemanager.address” to the active resource 
manager, jobs will submit fine. 

Is there a manner to set “spark-submit —master yarn-cluster” to respect the 
failover proxy?

Thanks in advance,
Matt
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to RabbitMQ

2014-08-18 Thread Vida Ha
Hi John,

It seems like original problem you had was that you were initializing the
RabbitMQ connection on the driver, but then calling the code to write to
RabbitMQ on the workers (I'm guessing, but I don't know since I didn't see
your code).  That's definitely a problem because the connection can't be
serialized and passed onto the workers.

dstream.foreachRDD(rdd = {
   // create connection / channel to source
   rdd.foreach(
   //  tries write to rabbitMQ from the worker - there is a problem
since the connection cannot be passed to the workers.
  element =  // write using channel
   )

})


You then changed the code to open the connection on the workers, and write
out the data once per worker.  This worked, but as you saw - you are
writing out the data multiple times.

dstream.foreachRDD(rdd = {
rdd.foreachPartition(iterator = {
// Create or get a singleton instance of a connection / channel
iter.foreach(element =  // write using connection / channel

   // This works and writes, but writes once per partition.
   })
})


If I understand correctly, I believe what you want is to open the
connection on the driver, and write to RabbitMQ from the driver as well.

dstream.foreachRDD(rdd = {
   // Any Code you put here is executed on the driver.
   rdd.foreach(
  // Any code inside rdd.forEach is executed on the workers.
   )

   // This code will be executed back on the driver again.
   //  You can open the connection and write stats here.
})


Does this clear things up?  I hope it's very clear now when code in the
streaming example executes on the driver vs. the workers.

-Vida





On Mon, Aug 18, 2014 at 2:20 PM, jschindler john.schind...@utexas.edu
wrote:

 Well, it looks like I can use the .repartition(1) method to stuff
 everything
 in one partition so that gets rid of the duplicate messages I send to
 RabbitMQ but that seems like a bad idea perhaps.  Wouldn't that hurt
 scalability?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Writing to RabbitMQ

2014-08-18 Thread Vida Ha
Oh sorry, just to be more clear - writing from the driver program is only
safe if the amount of data you are trying to write is small enough to fit
on memory in the driver program.  I looked at your code, and since you are
just writing a few things each time interval, this seems safe.


-Vida


On Mon, Aug 18, 2014 at 4:25 PM, Vida Ha v...@databricks.com wrote:

 Hi John,

 It seems like original problem you had was that you were initializing the
 RabbitMQ connection on the driver, but then calling the code to write to
 RabbitMQ on the workers (I'm guessing, but I don't know since I didn't see
 your code).  That's definitely a problem because the connection can't be
 serialized and passed onto the workers.

 dstream.foreachRDD(rdd = {
// create connection / channel to source
rdd.foreach(
//  tries write to rabbitMQ from the worker - there is a problem
 since the connection cannot be passed to the workers.
   element =  // write using channel
)

 })


 You then changed the code to open the connection on the workers, and write
 out the data once per worker.  This worked, but as you saw - you are
 writing out the data multiple times.

 dstream.foreachRDD(rdd = {
 rdd.foreachPartition(iterator = {
 // Create or get a singleton instance of a connection / channel
 iter.foreach(element =  // write using connection / channel

// This works and writes, but writes once per partition.
})
 })


 If I understand correctly, I believe what you want is to open the
 connection on the driver, and write to RabbitMQ from the driver as well.

 dstream.foreachRDD(rdd = {
// Any Code you put here is executed on the driver.
rdd.foreach(
   // Any code inside rdd.forEach is executed on the workers.
)

// This code will be executed back on the driver again.
//  You can open the connection and write stats here.
 })


 Does this clear things up?  I hope it's very clear now when code in the
 streaming example executes on the driver vs. the workers.

 -Vida





 On Mon, Aug 18, 2014 at 2:20 PM, jschindler john.schind...@utexas.edu
 wrote:

 Well, it looks like I can use the .repartition(1) method to stuff
 everything
 in one partition so that gets rid of the duplicate messages I send to
 RabbitMQ but that seems like a bad idea perhaps.  Wouldn't that hurt
 scalability?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Processing multiple files in parallel

2014-08-18 Thread SK

Hi,

I have a piece of code that reads all the (csv) files in a folder. For each
file, it parses each line, extracts the first 2 elements from each row of
the file, groups the tuple  by the key and finally outputs the  number of
unique values for each key.

val conf = new SparkConf().setAppName(App)
val sc = new SparkContext(conf)

val user_time = sc.union(sc.textFile(/directory/*))// union of all
files in the directory
   .map(line = {
   val fields = line.split(,)
   (fields(1), fields(0))  // extract first
2 elements
  }) 
   .groupByKey  // group by timestamp
  .map(g= (g._1, g._2.toSet.size)) // get the
number of unique ids per timestamp

I have a lot of files in the directory (several hundreds). The program takes
a long time. I am not sure if the union operation is preventing the files
from being processed in parallel. Is there a better way to parallelize the
above code ? For example, the first two operations (reading each file and
extracting the first 2 columns from each file) can be done in parallel, but
I am not sure if that is how Spark schedules the above code.

thanks
  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
We are prototyping an application with Spark streaming and Kinesis. We use
kinesis to accept incoming txn data, and then process them using spark
streaming. So far we really liked both technologies, and we saw both
technologies are getting mature rapidly. We are almost settled to use these
two technologies, but we are a little scary by the paragraph in the
programming guide.

For network-based data sources like Kafka and Flume, the received input
data is replicated in memory between nodes of the cluster (default
replication factor is 2). So if a worker node fails, then the system can
recompute the lost from the the left over copy of the input data. However,
if the worker node where a network receiver was running fails, then a tiny
bit of data may be lost, that is, the data received by the system but not
yet replicated to other node(s). The receiver will be started on a
different node and it will continue to receive data.

Since our application cannot tolerate losing customer data, I am wondering
what is the best way for us to address this issue.
1) We are thinking writing application specific logic to address the data
loss. To us, the problem seems to be caused by that Kinesis receivers
advanced their checkpoint before we know for sure the data is replicated.
For example, we can do another checkpoint ourselves to remember the kinesis
sequence number for data that has been processed by spark streaming. When
Kinesis receiver is restarted due to worker failures, we restarted it from
the checkpoint we tracked. We also worry about our driver program (or the
whole cluster) dies because of a bug in the application, the above logic
will allow us to resume from our last checkpoint.

Is there any best practices out there for this issue? I suppose many folks
are using spark streaming with network receivers, any suggestion is
welcomed.
2) Write kinesis data to s3 first, then either use it as a backup or read
from s3 in spark streaming. This is the safest approach but with a
performance/latency penalty. On the other hand,  we may have to write data
to s3 anyway since Kinesis only stores up to 24 hours data just in case we
had a bad day in our server infrastructure.
3) Wait for this issue to be addressed in spark streaming. I found this
ticket https://issues.apache.org/jira/browse/SPARK-1647, but it is not
resolved yet.

Thanks,
Wei


Re: How to use Spark Streaming from an HTTP api?

2014-08-18 Thread Silvio Fiorito
You need to create a custom receiver that submits the HTTP requests then
deserializes the data and pushes it into the Streaming context.

See here for an example:
http://spark.apache.org/docs/latest/streaming-custom-receivers.html


On 8/18/14, 6:20 PM, bumble123 tc1...@att.com wrote:

I want to send an HTTP request (specifically to OpenTSDB) to get data.
I've
been looking at the StreamingContext api and don't seem to see any methods
that can connect to this. Has anyone tried connecting Spark Streaming to a
server via HTTP requests before? How have you done it?



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Strea
ming-from-an-HTTP-api-tp12330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does HiveContext support Parquet?

2014-08-18 Thread Silvio Fiorito
First the JAR needs to be deployed using the ‹jars argument. Then in your
HQL code you need to use the DeprecatedParquetInputFormat and
DeprecatedParquetOutputFormat as described here
https://cwiki.apache.org/confluence/display/Hive/Parquet#Parquet-Hive0.10-0
.12

This is because SparkSQL is based on Hive 0.12.

That¹s what¹s worked for me.

Thanks,
Silvio

On 8/18/14, 3:14 PM, lyc yanchen@huawei.com wrote:

I followed your instructions to try to load data as parquet format through
hiveContext but failed. Do you happen to know my uncorrectness in the
following steps?

The steps I am following is like:
1. download parquet-hive-bundle-1.5.0.jar
2. revise hive-site.xml including this:

property
  namehive.jar.directory/name
  value/home/hduser/hive/lib/parquet-hive-bundle-1.5.0.jar/value
  description
This is the location hive in tez mode will look for to find a site
wide
installed hive instance. If not set, the directory under
hive.user.install.directory
corresponding to current user name will be used.
  /description
/property

3. copy hive-site.xml to all nodes.
4. start spark-shell, then try to create table:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
 hql(create table part (P_PARTKEY INT, P_NAME STRING, P_MFGR STRING,
P_BRAND STRING, P_TYPE STRING, P_SIZE INT, P_CONTAINER STRING,
P_RETAILPRICE
DOUBLE, P_COMMENT STRING) STORED AS PARQUET)

Then I got this error:
14/08/18 19:09:00 ERROR Driver: FAILED: SemanticException Unrecognized
file
format in STORED AS clause: PARQUET
org.apache.hadoop.hive.ql.parse.SemanticException: Unrecognized file
format
in STORED AS clause: PARQUET
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.handleGenericFileForm
at(BaseSemanticAnalyzer.java:569)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(Semant
icAnalyzer.java:8968)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticA
nalyzer.java:8313)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticA
nalyzer.java:284)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
at
org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(Hive
Context.scala:250)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.sca
la:247)
at
org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85)
at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90)
at 
$line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18)
at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23)
at $line44.$read$$iwC$$iwC$$iwC$$iwC.init(console:25)
at $line44.$read$$iwC$$iwC$$iwC.init(console:27)
at $line44.$read$$iwC$$iwC.init(console:29)
at $line44.$read$$iwC.init(console:31)
at $line44.$read.init(console:33)
at $line44.$read$.init(console:37)
at $line44.$read$.clinit(console)
at $line44.$eval$.init(console:7)
at $line44.$eval$.clinit(console)
at $line44.$eval.$print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm
pl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:84
1)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoo
p.scala:936)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala
:884)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala
:884)
at

Re: Segmented fold count

2014-08-18 Thread fil
fil wrote
 - Python functions like groupCount; these get reflected from their Python
 AST and converted into a Spark DAG? Presumably if I try and do something
 non-convertible this transformation process will throw an error? In other
 words this runs in the JVM.

Further to this - it seems that Python does run on each node in the cluster,
meaning it runs outside the JVM. Presumably this means that writing this in
Scala would be far more performant.

Could I write groupCount() in Scala, and then use it from Pyspark? Care to
supply an example, I'm finding them hard to find :)


fil wrote
 - I had considered that partitions were batches of distributable work,
 and generally large. Presumably the above is OK with small groups (eg.
 average size  10) - this won't kill performance?

I'm still a bit confused about the dual meaning of partition: work
segmentation, and key groups. Care to clarify anyone - when are partitions
used to describe chunks of data for different nodes in the cluster (ie.
large), and when are they used to describe groups of items in data (ie.
small)..




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12342.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Tobias Pfeiffer
Hi Wei,

On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com
wrote:

 Since our application cannot tolerate losing customer data, I am wondering
 what is the best way for us to address this issue.
 1) We are thinking writing application specific logic to address the data
 loss. To us, the problem seems to be caused by that Kinesis receivers
 advanced their checkpoint before we know for sure the data is replicated.
 For example, we can do another checkpoint ourselves to remember the kinesis
 sequence number for data that has been processed by spark streaming. When
 Kinesis receiver is restarted due to worker failures, we restarted it from
 the checkpoint we tracked.


This sounds pretty much to me like the way Kafka does it. So, I am not
saying that the stock KafkaReceiver does what you want (it may or may not),
but it should be possible to update the offset (corresponds to sequence
number) in Zookeeper only after data has been replicated successfully. I
guess replace Kinesis by Kafka is not in option for you, but you may
consider pulling Kinesis data into Kafka before processing with Spark?

Tobias


sqlContext.parquetFile(path) fails if path is a file but succeeds if a directory

2014-08-18 Thread Fengyun RAO
I'm using CDH 5.1 with spark 1.0.

When I try to run Spark SQL following the Programming Guide

val parquetFile = sqlContext.parquetFile(path)

If the path is a file, it throws an exception:

 Exception in thread main java.lang.IllegalArgumentException:
Expected hdfs://*/file.parquet for be a directory with Parquet
files/metadata
at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetRelation.scala:301)
at 
org.apache.spark.sql.parquet.ParquetRelation.parquetSchema(ParquetRelation.scala:62)
at 
org.apache.spark.sql.parquet.ParquetRelation.init(ParquetRelation.scala:69)
at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:98)

However, if the path is the parent directory of the file, it succeeds.
Note: there is only one file in that directory.

I look into the source,

 /**
   * Try to read Parquet metadata at the given Path. We first see if
there is a summary file
   * in the parent directory. If so, this is used. Else we read the
actual footer at the given
   * location.
   * @param origPath The path at which we expect one (or more) Parquet files.
   * @return The `ParquetMetadata` containing among other things the schema.
   */
  def readMetaData(origPath: Path): ParquetMetadata

It doesn't require a directory, but it did throw an exception

 if (!fs.getFileStatus(path).isDir) {
  throw new IllegalArgumentException(
sExpected $path for be a directory with Parquet files/metadata)
 }


It seems odd to me, can anybody explains why, and how to read a file, not a
directory?


Re: Segmented fold count

2014-08-18 Thread Davies Liu
On Mon, Aug 18, 2014 at 7:41 PM, fil f...@pobox.com wrote:
 fil wrote
 - Python functions like groupCount; these get reflected from their Python
 AST and converted into a Spark DAG? Presumably if I try and do something
 non-convertible this transformation process will throw an error? In other
 words this runs in the JVM.

 Further to this - it seems that Python does run on each node in the cluster,
 meaning it runs outside the JVM. Presumably this means that writing this in
 Scala would be far more performant.

 Could I write groupCount() in Scala, and then use it from Pyspark? Care to
 supply an example, I'm finding them hard to find :)

It's doable, but not so convenient. If you really care about the performance
difference, you should write your program in Scala.


 fil wrote
 - I had considered that partitions were batches of distributable work,
 and generally large. Presumably the above is OK with small groups (eg.
 average size  10) - this won't kill performance?

 I'm still a bit confused about the dual meaning of partition: work
 segmentation, and key groups. Care to clarify anyone - when are partitions
 used to describe chunks of data for different nodes in the cluster (ie.
 large), and when are they used to describe groups of items in data (ie.
 small)..

An partition means a chunk of data in RDD, the computation on a partition
is a task, which will be sent to a node in the cluster.


 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12342.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark - Identifying and skipping processed data in hdfs

2014-08-18 Thread salemi
Hi, 

Mine data source stores the incoming data every 10 second to hdfs. The
naming convention save-timestamp.csv (see below) 

drwxr-xr-x  ali supergroup  0 B 0   0 B save-1408396065000.csv 
drwxr-xr-x  ali supergroup  0 B 0   0 B save-140839607.csv 
drwxr-xr-x  ali supergroup  0 B 0   0 B save-1408396075000.csv 
drwxr-xr-x  ali supergroup  0 B 0   0 B save-140839608.csv 

I would like to periodically (every 5min) read the files and process them.
is there a good example out there how to implement this? How do I know what
part of the data I have already processed? 

Thanks, 
Ali




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-Identifying-and-skipping-processed-data-in-hdfs-tp12347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
Yeah, Thanks a lot. I know for people understanding lazy execution this seems
straightforward. But for those who don't it may become a liability.

I've only tested its stability on a small example (which seems stable),
hopefully it's not a serendipity. Can a committer confirm this?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Data loss - Spark streaming and network receiver

2014-08-18 Thread Shao, Saisai
I think Currently Spark Streaming lack a data acknowledging mechanism when data 
is stored and replicated in BlockManager, so potentially data will be lost even 
pulled into Kafka, say if data is stored just in BlockGenerator not BM, while 
in the meantime Kafka itself commit the consumer offset, also at this point 
node is failed, from Kafka’s point this part of data is feed into Spark 
Streaming but actually this data is not yet processed, so potentially this part 
of data will never be processed again, unless you read the whole partition 
again.

To solve this potential data loss problem, Spark Streaming needs to offer a 
data acknowledging mechanism, so custom Receiver can use this acknowledgement 
to do checkpoint or recovery, like Storm.

Besides, driver failure is another story need to be carefully considered. So 
currently it is hard to make sure no data loss in Spark Streaming, still need 
to improve at some points ☺.

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Tuesday, August 19, 2014 10:47 AM
To: Wei Liu
Cc: user
Subject: Re: Data loss - Spark streaming and network receiver

Hi Wei,

On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
wei@stellarloyalty.commailto:wei@stellarloyalty.com wrote:
Since our application cannot tolerate losing customer data, I am wondering what 
is the best way for us to address this issue.
1) We are thinking writing application specific logic to address the data loss. 
To us, the problem seems to be caused by that Kinesis receivers advanced their 
checkpoint before we know for sure the data is replicated. For example, we can 
do another checkpoint ourselves to remember the kinesis sequence number for 
data that has been processed by spark streaming. When Kinesis receiver is 
restarted due to worker failures, we restarted it from the checkpoint we 
tracked.

This sounds pretty much to me like the way Kafka does it. So, I am not saying 
that the stock KafkaReceiver does what you want (it may or may not), but it 
should be possible to update the offset (corresponds to sequence number) in 
Zookeeper only after data has been replicated successfully. I guess replace 
Kinesis by Kafka is not in option for you, but you may consider pulling 
Kinesis data into Kafka before processing with Spark?

Tobias



Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Dibyendu Bhattacharya
Dear All,

Recently I have written a Spark Kafka Consumer to solve this problem. Even
we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
and consumer code has no handle to offset management.

The below code solves this problem, and this has is being tested in our
Spark Cluster and this working fine as of now.

https://github.com/dibbhatt/kafka-spark-consumer

This is Low Level Kafka Consumer using Kafka Simple Consumer API.

Please have a look at it and let me know your opinion. This has been
written to eliminate the Data loss by committing the offset after it is
written to BM. Also existing HighLevel KafkaUtils does not have any feature
to control Data Flow, and is gives Out Of Memory error is there is too much
backlogs in Kafka. This consumer solves this problem as well.  And this
code has been modified from earlier Storm Kafka consumer code and it has
lot of other features like recovery from Kafka node failures, ZK failures,
recover from Offset errors etc.

Regards,
Dibyendu


On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com wrote:

  I think Currently Spark Streaming lack a data acknowledging mechanism
 when data is stored and replicated in BlockManager, so potentially data
 will be lost even pulled into Kafka, say if data is stored just in
 BlockGenerator not BM, while in the meantime Kafka itself commit the
 consumer offset, also at this point node is failed, from Kafka’s point this
 part of data is feed into Spark Streaming but actually this data is not yet
 processed, so potentially this part of data will never be processed again,
 unless you read the whole partition again.



 To solve this potential data loss problem, Spark Streaming needs to offer
 a data acknowledging mechanism, so custom Receiver can use this
 acknowledgement to do checkpoint or recovery, like Storm.



 Besides, driver failure is another story need to be carefully considered.
 So currently it is hard to make sure no data loss in Spark Streaming, still
 need to improve at some points J.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
 *Sent:* Tuesday, August 19, 2014 10:47 AM
 *To:* Wei Liu
 *Cc:* user
 *Subject:* Re: Data loss - Spark streaming and network receiver



 Hi Wei,



 On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com
 wrote:

 Since our application cannot tolerate losing customer data, I am wondering
 what is the best way for us to address this issue.

 1) We are thinking writing application specific logic to address the data
 loss. To us, the problem seems to be caused by that Kinesis receivers
 advanced their checkpoint before we know for sure the data is replicated.
 For example, we can do another checkpoint ourselves to remember the kinesis
 sequence number for data that has been processed by spark streaming. When
 Kinesis receiver is restarted due to worker failures, we restarted it from
 the checkpoint we tracked.



 This sounds pretty much to me like the way Kafka does it. So, I am not
 saying that the stock KafkaReceiver does what you want (it may or may not),
 but it should be possible to update the offset (corresponds to sequence
 number) in Zookeeper only after data has been replicated successfully. I
 guess replace Kinesis by Kafka is not in option for you, but you may
 consider pulling Kinesis data into Kafka before processing with Spark?



 Tobias





Re: Does anyone have a stand alone spark instance running on Windows

2014-08-18 Thread Steve Lewis
OK I tried your build -
First you need to put spt in C:\sbt
Then you get
Microsoft Windows [Version 6.2.9200]
(c) 2012 Microsoft Corporation. All rights reserved.

e:\which java
/cygdrive/c/Program Files/Java/jdk1.6.0_25/bin/java

e:\java -version
java version 1.6.0_25
Java(TM) SE Runtime Environment (build 1.6.0_25-b06)


e:\sparksbt_opt.bat

e:\sparkset SCRIPT_DIR=C:\sbt\

e:\sparkjava -Xms512m -Xmx2g -Xss1M -XX:+CMSClassUnloadingEnabled
-XX:MaxPermSize=128m -jar C:\sbt\sbt-launch.jar
[ERROR] Terminal initialization failed; falling back to unsupported
java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but
interface was expected
at jline.TerminalFactory.create(TerminalFactory.java:101)
at jline.TerminalFactory.get(TerminalFactory.java:159)
at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:86)
at sbt.ConsoleLogger$.init(ConsoleLogger.scala:80)
at sbt.ConsoleLogger$.clinit(ConsoleLogger.scala)
at sbt.GlobalLogging$.initial(GlobalLogging.scala:40)
at sbt.StandardMain$.initialGlobalLogging(Main.scala:64)
at sbt.StandardMain$.initialState(Main.scala:73)
at sbt.xMain.run(Main.scala:29)
at xsbt.boot.Launch$.run(Launch.scala:55)
at xsbt.boot.Launch$$anonfun$explicit$1.apply(Launch.scala:45)
at xsbt.boot.Launch$.launch(Launch.scala:69)
at xsbt.boot.Launch$.apply(Launch.scala:16)
at xsbt.boot.Boot$.runImpl(Boot.scala:31)
at xsbt.boot.Boot$.main(Boot.scala:20)
at xsbt.boot.Boot.main(Boot.scala)

java.lang.IncompatibleClassChangeError: JLine incompatibility detected.
 Check that the sbt launcher is version 0.13.x or later.
at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:97)
at sbt.ConsoleLogger$.init(ConsoleLogger.scala:80)
at sbt.ConsoleLogger$.clinit(ConsoleLogger.scala)
at sbt.GlobalLogging$.initial(GlobalLogging.scala:40)
at sbt.StandardMain$.initialGlobalLogging(Main.scala:64)
at sbt.StandardMain$.initialState(Main.scala:73)
at sbt.xMain.run(Main.scala:29)
at xsbt.boot.Launch$.run(Launch.scala:55)
at xsbt.boot.Launch$$anonfun$explicit$1.apply(Launch.scala:45)
at xsbt.boot.Launch$.launch(Launch.scala:69)
at xsbt.boot.Launch$.apply(Launch.scala:16)
at xsbt.boot.Boot$.runImpl(Boot.scala:31)
at xsbt.boot.Boot$.main(Boot.scala:20)
at xsbt.boot.Boot.main(Boot.scala)
Error during sbt execution: java.lang.IncompatibleClassChangeError: JLine
incompatibility detected.  Check that the sbt launcher is version 0.13.x or
later.

I believe my version of sbt is -.0.13

Finally even if I could build Spark I still don't see how to launch a server


On Sat, Aug 16, 2014 at 7:33 PM, Manu Suryavansh suryavanshi.m...@gmail.com
 wrote:

 Hi,

 I have built spark-1.0.0 on Windows using Java 7/8 and I have been able to
 run several examples - here are my notes -
 http://ml-nlp-ir.blogspot.com/2014/04/building-spark-on-windows-and-cloudera.html
 on how to build from source and run examples in spark shell.


 Regards,
 Manu


 On Sat, Aug 16, 2014 at 12:14 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:

 I want to look at porting a Hadoop problem to Spark - eventually I want
 to run on a Hadoop 2.0 cluster but while I am learning and porting I want
 to run small problems in my windows box.
 I installed scala and sbt.
 I download Spark and in the spark directory can say
 mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package
 which succeeds
 I tried
 sbt/sbt assembly
 which fails with errors

 In the documentation
 https://spark.apache.org/docs/latest/spark-standalone.htmlit says

 *Note:* The launch scripts do not currently support Windows. To run a
 Spark cluster on Windows, start the master and workers by hand.
 with no indication of how to do this.

 I can build and run samples (say JavaWordCount)  to the point where they
 fail because a master cannot be found (none is running)

 I want to know how to get a spark master and a slave or two running on my
 windows box so I can look at the samples and start playing with Spark

 Does anyone have a windows instance running??
  Please DON'T SAY I SHOULD RUN LINUX! if it is supposed to work on
 windows someone should have tested it and be willing to state how.






 --
 Manu Suryavansh




-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com


Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
Thank you all for responding to my question. I am pleasantly surprised by
this many prompt responses I got. It shows the strength of the spark
community.

Kafka is still an option for us, I will check out the link provided by
Dibyendu.

Meanwhile if someone out there already figured this out with Kinesis,
please keep your suggestion coming. Thanks.

Thanks,
Wei


On Mon, Aug 18, 2014 at 9:31 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Dear All,

 Recently I have written a Spark Kafka Consumer to solve this problem. Even
 we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
 and consumer code has no handle to offset management.

 The below code solves this problem, and this has is being tested in our
 Spark Cluster and this working fine as of now.

 https://github.com/dibbhatt/kafka-spark-consumer

 This is Low Level Kafka Consumer using Kafka Simple Consumer API.

 Please have a look at it and let me know your opinion. This has been
 written to eliminate the Data loss by committing the offset after it is
 written to BM. Also existing HighLevel KafkaUtils does not have any feature
 to control Data Flow, and is gives Out Of Memory error is there is too much
 backlogs in Kafka. This consumer solves this problem as well.  And this
 code has been modified from earlier Storm Kafka consumer code and it has
 lot of other features like recovery from Kafka node failures, ZK failures,
 recover from Offset errors etc.

 Regards,
 Dibyendu


 On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think Currently Spark Streaming lack a data acknowledging mechanism
 when data is stored and replicated in BlockManager, so potentially data
 will be lost even pulled into Kafka, say if data is stored just in
 BlockGenerator not BM, while in the meantime Kafka itself commit the
 consumer offset, also at this point node is failed, from Kafka’s point this
 part of data is feed into Spark Streaming but actually this data is not yet
 processed, so potentially this part of data will never be processed again,
 unless you read the whole partition again.



 To solve this potential data loss problem, Spark Streaming needs to offer
 a data acknowledging mechanism, so custom Receiver can use this
 acknowledgement to do checkpoint or recovery, like Storm.



 Besides, driver failure is another story need to be carefully considered.
 So currently it is hard to make sure no data loss in Spark Streaming, still
 need to improve at some points J.



 Thanks

 Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
 *Sent:* Tuesday, August 19, 2014 10:47 AM
 *To:* Wei Liu
 *Cc:* user
 *Subject:* Re: Data loss - Spark streaming and network receiver



 Hi Wei,



 On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com
 wrote:

 Since our application cannot tolerate losing customer data, I am
 wondering what is the best way for us to address this issue.

 1) We are thinking writing application specific logic to address the data
 loss. To us, the problem seems to be caused by that Kinesis receivers
 advanced their checkpoint before we know for sure the data is replicated.
 For example, we can do another checkpoint ourselves to remember the kinesis
 sequence number for data that has been processed by spark streaming. When
 Kinesis receiver is restarted due to worker failures, we restarted it from
 the checkpoint we tracked.



 This sounds pretty much to me like the way Kafka does it. So, I am not
 saying that the stock KafkaReceiver does what you want (it may or may not),
 but it should be possible to update the offset (corresponds to sequence
 number) in Zookeeper only after data has been replicated successfully. I
 guess replace Kinesis by Kafka is not in option for you, but you may
 consider pulling Kinesis data into Kafka before processing with Spark?



 Tobias







Re: Working with many RDDs in parallel?

2014-08-18 Thread David Tinker
Hmm I thought as much. I am using Cassandra with the Spark connector. What
I really need is a RDD created from a query against Cassandra of the form
where partition_key = :id where :id is taken from a list. Some grouping
of the ids would be a way to partition this.


On Mon, Aug 18, 2014 at 3:42 PM, Sean Owen so...@cloudera.com wrote:

 You won't be able to use RDDs inside of RDD operation. I imagine your
 immediate problem is that the code you've elided references 'sc' and
 that gets referenced by the PairFunction and serialized, but it can't
 be.

 If you want to play it this way, parallelize across roots in Java.
 That is just use an ExecutorService to launch a bunch of operations on
 RDDs in parallel. There's no reason you can't do that, although I
 suppose there are upper limits as to what makes sense on your cluster.
 1000 RDD count()s at once isn't a good idea for example.

 It may be the case that you don't really need a bunch of RDDs at all,
 but can operate on an RDD of pairs of Strings (roots) and
 something-elses, all at once.


 On Mon, Aug 18, 2014 at 2:31 PM, David Tinker david.tin...@gmail.com
 wrote:
  Hi All.
 
  I need to create a lot of RDDs starting from a set of roots and count
 the
  rows in each. Something like this:
 
  final JavaSparkContext sc = new JavaSparkContext(conf);
  ListString roots = ...
  MapString, Object res = sc.parallelize(roots).mapToPair(new
  PairFunctionString, String, Long(){
  public Tuple2String, Long call(String root) throws Exception {
  ... create RDD based on root from sc somehow ...
  return new Tuple2String, Long(root, rdd.count())
  }
  }).countByKey()
 
  This fails with a message about JavaSparkContext not being serializable.
 
  Is there a way to get at the content inside of the map function or
 should I
  be doing something else entirely?
 
  Thanks
  David




-- 
http://qdb.io/ Persistent Message Queues With Replay and #RabbitMQ
Integration