Re: Segmented fold count
import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an RDD, you can use RDD.mapPartitions(groupCount).collect() On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote: Can anyone assist with a scan of the following kind (Python preferred, but whatever..)? I'm looking for a kind of segmented fold count. Input: [1,1,1,2,2,3,4,4,5,1] Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)] or preferably two output columns: id: [1,2,3,4,5,1] count: [3,2,1,2,1,1] I can use a groupby/count, except for the fact that I just want to scan - not resort. Ideally this would be as low-level as possible and perform in a simple single scan. It also needs to retain the original sort order. Thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Segmented fold count
What happens when a run of numbers is spread across a partition boundary? I think you might end up with two adjacent groups of the same value in that situation. On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote: import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an RDD, you can use RDD.mapPartitions(groupCount).collect() On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote: Can anyone assist with a scan of the following kind (Python preferred, but whatever..)? I'm looking for a kind of segmented fold count. Input: [1,1,1,2,2,3,4,4,5,1] Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)] or preferably two output columns: id: [1,2,3,4,5,1] count: [3,2,1,2,1,1] I can use a groupby/count, except for the fact that I just want to scan - not resort. Ideally this would be as low-level as possible and perform in a simple single scan. It also needs to retain the original sort order. Thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Segmented fold count
On Sun, Aug 17, 2014 at 11:07 PM, Andrew Ash and...@andrewash.com wrote: What happens when a run of numbers is spread across a partition boundary? I think you might end up with two adjacent groups of the same value in that situation. Yes, need another scan to combine this continuous groups with same value. On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote: import itertools l = [1,1,1,2,2,3,4,4,5,1] gs = itertools.groupby(l) map(lambda (n, it): (n, sum(1 for _ in it)), gs) [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)] def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an RDD, you can use RDD.mapPartitions(groupCount).collect() On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote: Can anyone assist with a scan of the following kind (Python preferred, but whatever..)? I'm looking for a kind of segmented fold count. Input: [1,1,1,2,2,3,4,4,5,1] Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)] or preferably two output columns: id: [1,2,3,4,5,1] count: [3,2,1,2,1,1] I can use a groupby/count, except for the fact that I just want to scan - not resort. Ideally this would be as low-level as possible and perform in a simple single scan. It also needs to retain the original sort order. Thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: application as a service
Another option is using Tachyon to cache the RDD, then the cache can be shared by different applications. See how to use Spark with Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html Davies On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote: You can also look into using ooyala's job server at https://github.com/ooyala/spark-jobserver This already has a spary server built in that allows you to do what has already been explained above. Sounds like it should solve your problem. Enjoy! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/application-as-a-service-tp12253p12267.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutOfMemory Error
Hi Ghousia, You can try the following: 1. Increase the heap size https://spark.apache.org/docs/0.9.0/configuration.html 2. Increase the number of partitions http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine 3. You could try persisting the RDD to use DISK_ONLY http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Thanks Best Regards On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com wrote: Hi, I am trying to implement machine learning algorithms on Spark. I am working on a 3 node cluster, with each node having 5GB of memory. Whenever I am working with slightly more number of records, I end up with OutOfMemory Error. Problem is, even if number of records is slightly high, the intermediate result from a transformation is huge and this results in OutOfMemory Error. To overcome this, we are partitioning the data such that each partition has only a few records. Is there any better way to fix this issue. Some thing like spilling the intermediate data to local disk? Thanks, Ghousia. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NullPointerException when connecting from Spark to a Hive table backed by HBase
Hello: I am trying to setup Spark to connect to a Hive table which is backed by HBase, but I am running into the following NullPointerException: scala val hiveCount = hiveContext.sql(select count(*) from dataset_records).collect().head.getLong(0) 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from dataset_records 14/08/18 06:34:29 INFO ParseDriver: Parse Completed 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default tbl=dataset_records 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=dataset_records 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with curMem=0, maxMem=280248975 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 156.5 KB, free 267.1 MB) 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at SparkPlan.scala:85 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to exception - job: 0 java.lang.NullPointerException at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502) at org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) This is happening from the master on spark, I am running hbase version hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running the spark shell: bin/spark-shell --driver-class-path /opt/hive/latest/lib/hive-hbase-handler-0.13.1.jar:/opt/hive/latest/lib/zookeeper-3.4.5.jar:/opt/spark-poc/lib_managed/jars/com.google.guava/guava/guava-14.0.1.jar:/opt/hbase/latest/lib/hbase-common-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/hbase-server-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/hbase-client-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/hbase-protocol-0.98.4-hadoop1.jar:/opt/hbase/latest/lib/htrace-core-2.04.jar:/opt/hbase/latest/lib/netty-3.6.6.Final.jar:/opt/hbase/latest/lib/hbase-hadoop-compat-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-client/hbase-client-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-common/hbase-common-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-server/hbase-server-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-prefix-tree/hbase-prefix-tree-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/jars/org.apache.hbase/hbase-protocol/hbase-protocol-0.98.4-hadoop1.jar:/opt/spark-poc/lib_managed/bundles/com.google.protobuf/protobuf-java/protobuf-java-2.5.0.jar:/opt/spark-poc/lib_managed/jars/org.cloudera.htrace/htrace-core/htrace-core-2.04.jar:/opt/spark/sql/hive/target/spark-hive_2.10-1.1.0-SNAPSHOT.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-common/hive-common-0.12.0.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-exec/hive-exec-0.12.0.jar:/opt/spark-poc/lib_managed/jars/org.apache.thrift/libthrift/libthrift-0.9.0.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-shims/hive-shims-0.12.0.jar:/opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-metastore/hive-metastore-0.12.0.jar:/opt/spark/sql/catalyst/target/spark-catalyst_2.10-1.1.0-SNAPSHOT.jar:/opt/spark-poc/lib_managed/jars/org.antlr/antlr-runtime/antlr-runtime-3.4.jar:/opt/spark-poc/lib_managed/jars/org.apache.thrift/libfb303/libfb303-0.9.0.jar:/opt/spark-poc/lib_managed/jars/javax.jdo/jdo-api/jdo-api-3.0.1.jar:/opt/spark-poc/lib_managed/jars/org.datanucleus/datanucleus-api-jdo/datanucleus-api-jdo-3.2.1.jar:/opt/spark-poc/lib_managed/jars/org.datanucleus/datanucleus-core/datanucleus-core-3.2.2.jar:/opt/spark-poc/lib_managed/jars/org.datanucleus/datanucleus-rdbms/datanucleus-rdbms-3.2.1.jar:/opt/spark-poc/lib_managed/jars/org.apache.derby/derby/derby-10.4.2.0.jar:/opt/spark-poc/sbt/ivy/cache/org.apache.hive/hive-hbase-handler/jars/hive-hbase-handler-0.13.1.jar:/opt/spark-poc/lib_managed/jars/com.typesafe/scalalogging-slf4j_2.10/scalalogging-slf4j_2.10-1.0.1.jar:/opt/spark-poc/lib_managed/bundles/com.jolbox/bonecp/bonecp-0.7.1.RELEASE.jar:/opt/spark-poc/sbt/ivy/cache/com.datastax.cassandra/cassandra-driver-core/bundles/cassandra-driver-core-2.0.4.jar:/opt/spark-poc/lib_managed/jars/org.json/json/json-20090211.jar Can anybody help me? Best, -- Cesar Arevalo Software Engineer ❘ Zephyr Health 450 Mission Street, Suite #201 ❘ San Francisco, CA 94105 m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth https://twitter.com/zephyrhealth o: +1 415-529-7649 ❘ f: +1
a noob question for how to implement setup and cleanup in Spark map
Hi All, I'm new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark... In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: OutOfMemory Error
Thanks for the answer Akhil. We are right now getting rid of this issue by increasing the number of partitions. And we are persisting RDDs to DISK_ONLY. But the issue is with heavy computations within an RDD. It would be better if we have the option of spilling the intermediate transformation results to local disk (only in case if memory consumption is high) . Do we have any such option available with Spark? If increasing the partitions is the only the way, then one might end up with OutOfMemory Errors, when working with certain algorithms where intermediate result is huge. On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Ghousia, You can try the following: 1. Increase the heap size https://spark.apache.org/docs/0.9.0/configuration.html 2. Increase the number of partitions http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine 3. You could try persisting the RDD to use DISK_ONLY http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Thanks Best Regards On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com wrote: Hi, I am trying to implement machine learning algorithms on Spark. I am working on a 3 node cluster, with each node having 5GB of memory. Whenever I am working with slightly more number of records, I end up with OutOfMemory Error. Problem is, even if number of records is slightly high, the intermediate result from a transformation is huge and this results in OutOfMemory Error. To overcome this, we are partitioning the data such that each partition has only a few records. Is there any better way to fix this issue. Some thing like spilling the intermediate data to local disk? Thanks, Ghousia. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Looks like your hiveContext is null. Have a look at this documentation. https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Thanks Best Regards On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: Hello: I am trying to setup Spark to connect to a Hive table which is backed by HBase, but I am running into the following NullPointerException: scala val hiveCount = hiveContext.sql(select count(*) from dataset_records).collect().head.getLong(0) 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from dataset_records 14/08/18 06:34:29 INFO ParseDriver: Parse Completed 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default tbl=dataset_records 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=dataset_records 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with curMem=0, maxMem=280248975 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 156.5 KB, free 267.1 MB) 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at SparkPlan.scala:85 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to exception - job: 0 java.lang.NullPointerException at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502) at org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) This is happening from the master on spark, I am running hbase version hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running the spark shell: bin/spark-shell --driver-class-path
RE: a noob question for how to implement setup and cleanup in Spark map
Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry From: MA33 YTHung1 Sent: Monday, August 18, 2014 2:42 PM To: user@spark.apache.org Subject: a noob question for how to implement setup and cleanup in Spark map Hi All, I'm new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark... In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: OutOfMemory Error
I believe spark.shuffle.memoryFraction is the one you are looking for. spark.shuffle.memoryFraction : Fraction of Java heap to use for aggregation and cogroups during shuffles, if spark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction. You can give it a try. Thanks Best Regards On Mon, Aug 18, 2014 at 12:21 PM, Ghousia ghousia.ath...@gmail.com wrote: Thanks for the answer Akhil. We are right now getting rid of this issue by increasing the number of partitions. And we are persisting RDDs to DISK_ONLY. But the issue is with heavy computations within an RDD. It would be better if we have the option of spilling the intermediate transformation results to local disk (only in case if memory consumption is high) . Do we have any such option available with Spark? If increasing the partitions is the only the way, then one might end up with OutOfMemory Errors, when working with certain algorithms where intermediate result is huge. On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Ghousia, You can try the following: 1. Increase the heap size https://spark.apache.org/docs/0.9.0/configuration.html 2. Increase the number of partitions http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine 3. You could try persisting the RDD to use DISK_ONLY http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Thanks Best Regards On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com wrote: Hi, I am trying to implement machine learning algorithms on Spark. I am working on a 3 node cluster, with each node having 5GB of memory. Whenever I am working with slightly more number of records, I end up with OutOfMemory Error. Problem is, even if number of records is slightly high, the intermediate result from a transformation is huge and this results in OutOfMemory Error. To overcome this, we are partitioning the data such that each partition has only a few records. Is there any better way to fix this issue. Some thing like spilling the intermediate data to local disk? Thanks, Ghousia. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: a noob question for how to implement setup and cleanup in Spark map
You can create an RDD and then you can do a map or mapPartitions on that where in the top you will create the database connection and all, then do the operations and at the end close the connections. Thanks Best Regards On Mon, Aug 18, 2014 at 12:34 PM, Henry Hung ythu...@winbond.com wrote: Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry *From:* MA33 YTHung1 *Sent:* Monday, August 18, 2014 2:42 PM *To:* user@spark.apache.org *Subject:* a noob question for how to implement setup and cleanup in Spark map Hi All, I’m new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark… In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung -- The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. -- The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: a noob question for how to implement setup and cleanup in Spark map
I think this was a more comprehensive answer recently. Tobias is right that it is not quite that simple: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote: Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry From: MA33 YTHung1 Sent: Monday, August 18, 2014 2:42 PM To: user@spark.apache.org Subject: a noob question for how to implement setup and cleanup in Spark map Hi All, I’m new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark… In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Nope, it is NOT null. Check this out: scala hiveContext == null res2: Boolean = false And thanks for sending that link, but I had already looked at it. Any other ideas? I looked through some of the relevant Spark Hive code and I'm starting to think this may be a bug. -Cesar On Mon, Aug 18, 2014 at 12:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like your hiveContext is null. Have a look at this documentation. https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Thanks Best Regards On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: Hello: I am trying to setup Spark to connect to a Hive table which is backed by HBase, but I am running into the following NullPointerException: scala val hiveCount = hiveContext.sql(select count(*) from dataset_records).collect().head.getLong(0) 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from dataset_records 14/08/18 06:34:29 INFO ParseDriver: Parse Completed 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default tbl=dataset_records 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=dataset_records 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with curMem=0, maxMem=280248975 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 156.5 KB, free 267.1 MB) 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at SparkPlan.scala:85 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to exception - job: 0 java.lang.NullPointerException at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502) at org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) This is happening from the master on spark, I am running hbase version hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running the spark shell: bin/spark-shell --driver-class-path
RE: a noob question for how to implement setup and cleanup in Spark map
I slightly modify the code to use while(partitions.hasNext) { } instead of partitions.map(func) I suppose this can eliminate the uncertainty from lazy execution. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, August 18, 2014 3:10 PM To: MA33 YTHung1 Cc: user@spark.apache.org Subject: Re: a noob question for how to implement setup and cleanup in Spark map I think this was a more comprehensive answer recently. Tobias is right that it is not quite that simple: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote: Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry From: MA33 YTHung1 Sent: Monday, August 18, 2014 2:42 PM To: user@spark.apache.org Subject: a noob question for how to implement setup and cleanup in Spark map Hi All, I’m new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark… In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: Segmented fold count
Thanks for the reply! def groupCount(l): gs = itertools.groupby(l) return map(lambda (n, it): (n, sum(1 for _ in it)), gs) If you have an RDD, you can use RDD.mapPartitions(groupCount).collect() Yes, I am interested in RDD - not pure Python :) I am new to Spark, can you explain: - Python functions like groupCount; these get reflected from their Python AST and converted into a Spark DAG? Presumably if I try and do something non-convertible this transformation process will throw an error? In other words this runs in the JVM. - I had considered that partitions were batches of distributable work, and generally large. Presumably the above is OK with small groups (eg. average size 10) - this won't kill performance? On Mon, Aug 18, 2014 at 4:20 PM, Davies Liu dav...@databricks.com wrote: On Sun, Aug 17, 2014 at 11:07 PM, Andrew Ash and...@andrewash.com wrote: What happens when a run of numbers is spread across a partition boundary? I think you might end up with two adjacent groups of the same value in that situation. Yes, need another scan to combine this continuous groups with same value. Yep - this will happen frequently. So by this you mean scanning the resulting mapPartitions() results? Presumably I could eliminate adjacent duplicates - or specifically look for duplicates at the end/start of different batches (what is the Spark term for this) from different nodes in the cluster. What's the Spark'iest way to do this efficiently? :) Regards, Fil. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12295.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Then definitely its a jar conflict. Can you try removing this jar from the class path /opt/spark-poc/lib_managed/jars/org.spark-project.hive/hive-exec/ hive-exec-0.12.0.jar Thanks Best Regards On Mon, Aug 18, 2014 at 12:45 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: Nope, it is NOT null. Check this out: scala hiveContext == null res2: Boolean = false And thanks for sending that link, but I had already looked at it. Any other ideas? I looked through some of the relevant Spark Hive code and I'm starting to think this may be a bug. -Cesar On Mon, Aug 18, 2014 at 12:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like your hiveContext is null. Have a look at this documentation. https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Thanks Best Regards On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: Hello: I am trying to setup Spark to connect to a Hive table which is backed by HBase, but I am running into the following NullPointerException: scala val hiveCount = hiveContext.sql(select count(*) from dataset_records).collect().head.getLong(0) 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from dataset_records 14/08/18 06:34:29 INFO ParseDriver: Parse Completed 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default tbl=dataset_records 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=dataset_records 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with curMem=0, maxMem=280248975 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 156.5 KB, free 267.1 MB) 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at SparkPlan.scala:85 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to exception - job: 0 java.lang.NullPointerException at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502) at org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) This is happening from the master on spark, I am running hbase version hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running the spark shell: bin/spark-shell --driver-class-path
Re: Spark: why need a masterLock when sending heartbeat to master
Thanks, I got it ! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-why-need-a-masterLock-when-sending-heartbeat-to-master-tp12256p12297.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: s3:// sequence file startup time
Maybe irrelevant, but this resembles a lot the S3 Parquet file issue we've met before. It takes a dozen minutes to read the metadata because the ParquetInputFormat tries to call geFileStatus for all part-files sequentially. Just checked SequenceFileInputFormat, and found that a MapFile may share similar issue. On Mon, Aug 18, 2014 at 5:26 AM, Aaron Davidson ilike...@gmail.com wrote: The driver must initially compute the partitions and their preferred locations for each part of the file, which results in a serial getFileBlockLocations() on each part. However, I would expect this to take several seconds, not minutes, to perform on 1000 parts. Is your driver inside or outside of AWS? There is an order of magnitude difference in the latency of S3 requests if you're running outside of AWS. We have also experienced an excessive slowdown in the metadata lookups using Hadoop 2 versus Hadoop 1, likely due to the differing jets3t library versions. If you're using Hadoop 2, you might try downgrading to Hadoop 1.2.1 and seeing if the startup time decreases. On Sat, Aug 16, 2014 at 6:46 PM, kmatzen kmat...@gmail.com wrote: I have some RDD's stored as s3://-backed sequence files sharded into 1000 parts. The startup time is pretty long (~10's of minutes). It's communicating with S3, but I don't know what it's doing. Is it just fetching the metadata from S3 for each part? Is there a way to pipeline this with the computation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/s3-sequence-file-startup-time-tp12242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: application as a service
That helps a lot. Thanks. Zhanfeng Huo From: Davies Liu Date: 2014-08-18 14:31 To: ryaminal CC: u...@spark.incubator.apache.org Subject: Re: application as a service Another option is using Tachyon to cache the RDD, then the cache can be shared by different applications. See how to use Spark with Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html Davies On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote: You can also look into using ooyala's job server at https://github.com/ooyala/spark-jobserver This already has a spary server built in that allows you to do what has already been explained above. Sounds like it should solve your problem. Enjoy! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/application-as-a-service-tp12253p12267.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Working with many RDDs in parallel?
Hi All. I need to create a lot of RDDs starting from a set of roots and count the rows in each. Something like this: final JavaSparkContext sc = new JavaSparkContext(conf); ListString roots = ... MapString, Object res = sc.parallelize(roots).mapToPair(new PairFunctionString, String, Long(){ public Tuple2String, Long call(String root) throws Exception { ... create RDD based on root from sc somehow ... return new Tuple2String, Long(root, rdd.count()) } }).countByKey() This fails with a message about JavaSparkContext not being serializable. Is there a way to get at the content inside of the map function or should I be doing something else entirely? Thanks David
Re: Working with many RDDs in parallel?
You won't be able to use RDDs inside of RDD operation. I imagine your immediate problem is that the code you've elided references 'sc' and that gets referenced by the PairFunction and serialized, but it can't be. If you want to play it this way, parallelize across roots in Java. That is just use an ExecutorService to launch a bunch of operations on RDDs in parallel. There's no reason you can't do that, although I suppose there are upper limits as to what makes sense on your cluster. 1000 RDD count()s at once isn't a good idea for example. It may be the case that you don't really need a bunch of RDDs at all, but can operate on an RDD of pairs of Strings (roots) and something-elses, all at once. On Mon, Aug 18, 2014 at 2:31 PM, David Tinker david.tin...@gmail.com wrote: Hi All. I need to create a lot of RDDs starting from a set of roots and count the rows in each. Something like this: final JavaSparkContext sc = new JavaSparkContext(conf); ListString roots = ... MapString, Object res = sc.parallelize(roots).mapToPair(new PairFunctionString, String, Long(){ public Tuple2String, Long call(String root) throws Exception { ... create RDD based on root from sc somehow ... return new Tuple2String, Long(root, rdd.count()) } }).countByKey() This fails with a message about JavaSparkContext not being serializable. Is there a way to get at the content inside of the map function or should I be doing something else entirely? Thanks David - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark kryo serilizable exception
hi all, In RDD map , i invoke an object that is *Serialized* by java standard , and exception :: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 13 at com.esotericsoftware.kryo.io.Output.require(Output.java:138) at com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:138) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Spark Streaming Data Sharing
Based on my understanding something like this doesn't seem to be possible out of the box, but I thought I would write it up anyway in case someone has any ideas. We have conceptually one high volume input stream, each streaming job is either interested in a subset of the stream or the entire stream. We would like to get to the point where we could be running a large number of streaming jobs concurrently across a cluster. It does not seem that putting a buffer like Kafka in between the source and the streaming jobs would be a sustainable route as all jobs would be consuming the whole stream and the majority of them would only be interested in a small subset of the available data. As we don't know exactly what data each job would be interested in up front it would be difficult to separate/partition Kafka topics up front. What I think we want is a way to have one streaming job whose output is passed to n other streaming jobs. Has anyone though about implementing something like this? I don't see a way to have the Receiver for one streaming job be the output of another streaming job. This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide.
Re: Question regarding spark data partition and coalesce. Need info on my use case.
Hello Mayur, #3 in the new RangePartitioner(*3*, partitionedFile); is also a hard coded value for the number of partitions. Do you find a way where i can avoid that. And including the cluster size, partitions depends on the input data size also. Correct me if i am wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Question-regarding-spark-data-partition-and-coalesce-Need-info-on-my-use-case-tp12214p12311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark kryo serilizable exception
Hi,I was able to set this parameter in my application to resolve this issue: set(spark.kryoserializer.buffer.mb, 256) Please let me know if this helps. Date: Mon, 18 Aug 2014 21:50:02 +0800 From: dujinh...@hzduozhun.com To: user@spark.apache.org Subject: spark kryo serilizable exception hi all, In RDD map , i invoke an object that is Serialized by java standard , and exception :: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 13 at com.esotericsoftware.kryo.io.Output.require(Output.java:138) at com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:138) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
I removed the JAR that you suggested but now I get another error when I try to create the HiveContext. Here is the error: scala val hiveContext = new HiveContext(sc) error: bad symbolic reference. A signature in HiveContext.class refers to term ql in package org.apache.hadoop.hive which is not available. It may be completely missing from the current classpath, ommitted more stacktrace for readability... Best, -Cesar On Mon, Aug 18, 2014 at 12:47 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Then definitely its a jar conflict. Can you try removing this jar from the class path /opt/spark-poc/lib_managed/jars/org. spark-project.hive/hive-exec/hive-exec-0.12.0.jar Thanks Best Regards On Mon, Aug 18, 2014 at 12:45 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: Nope, it is NOT null. Check this out: scala hiveContext == null res2: Boolean = false And thanks for sending that link, but I had already looked at it. Any other ideas? I looked through some of the relevant Spark Hive code and I'm starting to think this may be a bug. -Cesar On Mon, Aug 18, 2014 at 12:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like your hiveContext is null. Have a look at this documentation. https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Thanks Best Regards On Mon, Aug 18, 2014 at 12:09 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: Hello: I am trying to setup Spark to connect to a Hive table which is backed by HBase, but I am running into the following NullPointerException: scala val hiveCount = hiveContext.sql(select count(*) from dataset_records).collect().head.getLong(0) 14/08/18 06:34:29 INFO ParseDriver: Parsing command: select count(*) from dataset_records 14/08/18 06:34:29 INFO ParseDriver: Parse Completed 14/08/18 06:34:29 INFO HiveMetaStore: 0: get_table : db=default tbl=dataset_records 14/08/18 06:34:29 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=default tbl=dataset_records 14/08/18 06:34:30 INFO MemoryStore: ensureFreeSpace(160296) called with curMem=0, maxMem=280248975 14/08/18 06:34:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 156.5 KB, free 267.1 MB) 14/08/18 06:34:30 INFO SparkContext: Starting job: collect at SparkPlan.scala:85 14/08/18 06:34:31 WARN DAGScheduler: Creating new stage failed due to exception - job: 0 java.lang.NullPointerException at org.apache.hadoop.hbase.util.Bytes.toBytes(Bytes.java:502) at org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat.getSplits(HiveHBaseTableInputFormat.java:418) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) This is happening from the master on spark, I am running hbase version hbase-0.98.4-hadoop1 and hive version 0.13.1. And here is how I am running the spark shell: bin/spark-shell --driver-class-path
Bug or feature? Overwrite broadcasted variables.
I'm curious to see that if you declare broadcasted wrapper as a var, and overwrite it in the driver program, the modification can have stable impact on all transformations/actions defined BEFORE the overwrite but was executed lazily AFTER the overwrite: val a = sc.parallelize(1 to 10) var broadcasted = sc.broadcast(broad) val b = a.map(_ + broadcasted.value) // b.persist() for (line - b.collect()) { print(line) } println(\n===) broadcasted = sc.broadcast(cast) for (line - b.collect()) { print(line) } the result is: 1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad === 1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast Of course, if you persist b before overwriting it will still get the non-surprising result (both are 10broad... because they are persisted). This can be useful sometimes but may cause confusion at other times (people can no longer add persist at will just for backup because it may change the result). So far I've found no documentation supporting this feature. So can some one confirm that its a feature craftly designed? Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merging complicated small matrices to one big matrix
rdd.flatMap(lambda x:x) maybe could solve your problem, it will convert an RDD from [[[1,2,3],[4,5,6]],[[7,8,9,],[10,11,12]]] into: [[1,2,3], [4,5,6], [7,8,9,], [10,11,12]] On Mon, Aug 18, 2014 at 2:42 AM, Chengi Liu chengi.liu...@gmail.com wrote: I have an rdd in pyspark which looks like follows: It has two sub matrices..(array,array) [ array([[-13.00771575, 0.2740844 , 0.9752694 , 0.67465999, -1.45741537, 0.546775 , 0.7900841 , -0.59473707, -1.11752044, 0.61564356], [ -0., 12.20115746, -0.49016935, -0.9236129 , -1.1693633 , -0.39135626, 1.10752864, 0.16920118, -1.098806 , 1.10045185], [ 0., 0., -11.26425992, 0.56309152, 0.44872832, 0.69722768, 0.84200281, 0.89537327, 0.10460865, -0.62938474], [ -0., -0., 0., 13.1112119 , 0.39986223, -1.22218936, 0.72315955, 0.12208597, -0.6258082 , -0.91077504], [ 0., -0., 0., 0., -11.04483145, -1.71948244, -0.73239228, -0.19651712, -0.97931725, -0.43263423], [ 0., 0., 0., -0., 0., -12.1996715 , -0.05580816, 0.20517336, 0.53584998, 1.3370874 ], [ 0., -0., -0., 0., 0., 0., 12.32603631, 0.47498103, -0.65901705, -0.85713277], [ 0., 0., 0., -0., 0., -0., 0., 11.90030251, 1.73036795, 0.70588443], [ -0., -0., 0., 0., -0., -0., 0., -0., 13.00493769, 1.37753403], [ 0., -0., 0., 0., 0., -0., 0., 0., -0., -10.89006283]]), array([[-12.43375184, 1.07703619, -0.47818221, 1.65919732, 0.96307502, -1.6322447 , -1.09409297, -0.64849112, -1.09349557, -0.68706834], [ 0., -11.93616969, 0.08784614, 1.76677411, -0.0585134 , -0.70979485, 0.29757848, 1.19562173, -1.54176475, 1.71500862], [ 0., -0., -12.42060272, 2.17060365, -1.3212244 , 0.73742297, 0.50410937, -0.35278129, -0.40513689, -0.81222302], [ -0., 0., 0., -11.93419851, -1.15614929, 1.04085489, 0.69986351, -1.3615322 , 0.43467842, -1.33041858], [ -0., -0., 0., 0., 11.22907137, -0.12925322, 0.46293906, -2.01577912, -2.26566926, -0.17750339], [ 0., 0., 0., 0., -0., -12.0705513 , -0.19432359, 0.41226088, 0.79436699, -0.61288711], [ 0., -0., 0., 0., -0., -0., 11.99770753, -1.24277228, 1.32240282, 1.5140609 ], [ -0., 0., -0., -0., 0., -0., 0., -13.07008472, 0.52031563, -1.56247391], [ 0., -0., 0., 0., -0., -0., -0., -0., 13.16585107, 0.57741265], [ 0., 0., -0., -0., 0., 0., 0., -0., -0., -13.53719704]]) ] So, basically I have sub matrices like [sub_matrix_1, sub_matrix_2 ] (the above has just two matrices.. I want to combine in one big matrix column wise [ sub_matrix_1 sub_matrix_2 ] Any suggestions? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Does HiveContext support Parquet?
I followed your instructions to try to load data as parquet format through hiveContext but failed. Do you happen to know my uncorrectness in the following steps? The steps I am following is like: 1. download parquet-hive-bundle-1.5.0.jar 2. revise hive-site.xml including this: property namehive.jar.directory/name value/home/hduser/hive/lib/parquet-hive-bundle-1.5.0.jar/value description This is the location hive in tez mode will look for to find a site wide installed hive instance. If not set, the directory under hive.user.install.directory corresponding to current user name will be used. /description /property 3. copy hive-site.xml to all nodes. 4. start spark-shell, then try to create table: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ hql(create table part (P_PARTKEY INT, P_NAME STRING, P_MFGR STRING, P_BRAND STRING, P_TYPE STRING, P_SIZE INT, P_CONTAINER STRING, P_RETAILPRICE DOUBLE, P_COMMENT STRING) STORED AS PARQUET) Then I got this error: 14/08/18 19:09:00 ERROR Driver: FAILED: SemanticException Unrecognized file format in STORED AS clause: PARQUET org.apache.hadoop.hive.ql.parse.SemanticException: Unrecognized file format in STORED AS clause: PARQUET at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.handleGenericFileFormat(BaseSemanticAnalyzer.java:569) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:8968) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8313) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:284) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:247) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90) at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line44.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line44.$read$$iwC$$iwC$$iwC.init(console:27) at $line44.$read$$iwC$$iwC.init(console:29) at $line44.$read$$iwC.init(console:31) at $line44.$read.init(console:33) at $line44.$read$.init(console:37) at $line44.$read$.clinit(console) at $line44.$eval$.init(console:7) at $line44.$eval$.clinit(console) at $line44.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
java.nio.channels.CancelledKeyException in Graphx Connected Components
Hey all, I’m trying to run connected components in graphx on about 400GB of data on 50 m3.xlarge nodes on emr. I keep getting java.nio.channels.CancelledKeyException when it gets to mapPartitions at VertexRDD.scala:347”. I haven’t been able to find much about this online, and nothing that seems relevant to my situation. I’m using spark built from the master repo, because I need the custom storage level feature in graphx. My configuration: .set(spark.executor.memory, 13g) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,org.apache.spark.graphx.GraphKryoRegistrator) .set(spark.executor.extraJavaOptions,-XX:-UseGCOverheadLimit) .set(spark.akka.frameSize, 128) .set(spark.storage.memoryFraction, 0.2) .set(spark.shuffle.memoryFraction, 0.7) .set(spark.akka.timeout,600”) The code I’m running: val graph = GraphLoader.edgeListFile(sc, “/foo/bar, minEdgePartitions=2000, edgeStorageLevel=StorageLevel.MEMORY_AND_DISK_SER, vertexStorageLevel=StorageLevel.MEMORY_AND_DISK_SER) val cc = graph.connectedComponents().vertices Stack trace: 14/08/18 18:50:07 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@501c1504 java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at java.nio.channels.SelectionKey.isAcceptable(SelectionKey.java:360) at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:372) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/08/18 18:50:07 INFO cluster.SparkDeploySchedulerBackend: Executor 48 disconnected, so removing it 14/08/18 18:50:07 ERROR network.SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(ip-10-136-91-34.ec2.internal,51095) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/08/18 18:50:07 INFO storage.BlockManagerInfo: Added rdd_16_161 in memory on ip-10-150-84-111.ec2.internal:58219 (size: 78.4 MB, free: 716.5 MB) 14/08/18 18:50:07 ERROR scheduler.TaskSchedulerImpl: Lost executor 48 on ip-10-136-91-34.ec2.internal: remote Akka client disassociated Log from the worker node in spark/logs/: 14/08/18 18:40:52 INFO worker.ExecutorRunner: Launch command: /usr/lib/jvm/java-7-oracle/bin/java -cp /home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/:/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/::/home/hadoop/spark/conf:/home/hadoop/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.3.jar:/home/hadoop/conf/ -XX:MaxPermSize=128m -Dspark.akka.timeout=600 -Dspark.akka.frameSize=128 -Dspark.driver.port=50456 -XX:-UseGCOverheadLimit -Xms13312M -Xmx13312M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@ip-10-171-49-153.ec2.internal:50456/user/CoarseGrainedScheduler 48 ip-10-136-91-34.ec2.internal 4 akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942/user/Worker app-20140818184051- 240.261: [GC [PSYoungGen: 132096K-18338K(153600K)] 132096K-18346K(503296K), 0.0285710 secs] [Times: user=0.04 sys=0.02, real=0.03 secs] 14/08/18 18:50:07 INFO worker.Worker: Executor app-20140818184051-/48 finished with state EXITED message Command exited with code 137 exitStatus 137 14/08/18 18:50:07 INFO worker.Worker: Asked to launch executor app-20140818184051-/50 for Spark CC 14/08/18 18:50:07 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.136.91.34%3A47436-2#201455087] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] - [akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error [Association failed with [akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [ akka.remote.EndpointAssociationException: Association
Extracting unique elements of an ArrayBuffer
Hi, I have a piece of code in which the result of a groupByKey operation is as follows: (2013-04, ArrayBuffer(s1, s2, s3, s1, s2, s4)) The first element is a String value representing a date and the ArrayBuffer consists of (non-unique) strings. I want to extract the unique elements of the ArrayBuffer. So I am expecting the result to be: (2013-04, ArrayBuffer(s1, s2, s3, s4)) I tried the following: .groupByKey .map(g = (g._1, g,_2.distinct) But I get the following runtime error: value distinct is not a member of Iterable[String] [error].map(g= (g._1, g._2.distinct)) I also tried g._2.distinct(), but got the same error. I looked at the Scala ArrayBuffer documentation and it supports distinct() and count() operations. I am using Spark 1.0.1 and scala 2.10.4. I would like to know how to extract the unique elements of the ArrayBuffer above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-unique-elements-of-an-ArrayBuffer-tp12320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Data Sharing
The Spark Job that has the main DStream, could have another DStream that is listening for stream subscription requests. So when a subscription is received, you could do a filter/forEach on the main DStream and respond to that one request. So you're basically creating a stream server that is capable of accepting filter requests. On Mon, Aug 18, 2014 at 10:30 AM, Levi Bowman levi.bow...@markit.com wrote: Based on my understanding something like this doesn’t seem to be possible out of the box, but I thought I would write it up anyway in case someone has any ideas. We have conceptually one high volume input stream, each streaming job is either interested in a subset of the stream or the entire stream. We would like to get to the point where we could be running a large number of streaming jobs concurrently across a cluster. It does not seem that putting a buffer like Kafka in between the source and the streaming jobs would be a sustainable route as all jobs would be consuming the whole stream and the majority of them would only be interested in a small subset of the available data. As we don’t know exactly what data each job would be interested in up front it would be difficult to separate/partition Kafka topics up front. What I think we want is a way to have one streaming job whose output is passed to n other streaming jobs. Has anyone though about implementing something like this? I don’t see a way to have the Receiver for one streaming job be the output of another streaming job. -- This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide.
Re: Writing to RabbitMQ
Well, it looks like I can use the .repartition(1) method to stuff everything in one partition so that gets rid of the duplicate messages I send to RabbitMQ but that seems like a bad idea perhaps. Wouldn't that hurt scalability? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark - reading hfds files every 5 minutes
Hi, Mine data source stores the incoming data every 10 second to hdfs. The naming convention save-timestamp.csv (see below) drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396065000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839607.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396075000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839608.csv I would like to periodically (every 5min) read the files and process them. is there a good example out there how to implement this? How do I know what part of the data I have already processed? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-reading-hfds-files-every-5-minutes-tp12325.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Looks like hbaseTableName is null, probably caused by incorrect configuration. String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); Here is the definition. public static final String HBASE_TABLE_NAME = hbase.table.name”; Thanks. Zhan Zhang On Aug 17, 2014, at 11:39 PM, Cesar Arevalo ce...@zephyrhealthinc.com wrote: HadoopRDD -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
setCallSite for API backtraces not showing up in logs?
What's the correct way to use setCallSite to get the change to show up in the spark logs? I have something like class RichRDD (rdd : RDD[MyThing]) { def mySpecialOperation() { rdd.context.setCallSite(bubbles and candy!) rdd.map() val result = rdd.groupBy() rdd.context.clearCallSite() result } } But when I use .mySpecialOperation bubbles and candy! doesn't seem to show up anywhere in the logs. Is this not the right way to use .setCallSite?
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote: I am testing our application(similar to personalised page rank using Pregel, and note that each vertex property will need pretty much more space to store after new iteration) [...] But when we ran it on larger graph(e.g. LiveJouranl), it always end at the error GC overhead limit exceeded, even the partitions number is increased to 48 from 8. If the graph (including vertex properties) is too large to fit in memory, you might try allowing it to spill to disk. When constructing the graph, you can set vertexStorageLevel and edgeStorageLevel to StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish. Ankur http://www.ankurdave.com/
Re: Bug or feature? Overwrite broadcasted variables.
I think the behavior is by designed. Because if b is not persisted, and in each call b.collect, broadcasted has point to a new broadcasted variable, serialized by driver, and fetched by executors. If you do persist, you don’t expect the RDD get changed due to new broadcasted variable. Thanks. Zhan Zhang On Aug 18, 2014, at 11:26 AM, Peng Cheng pc...@uow.edu.au wrote: I'm curious to see that if you declare broadcasted wrapper as a var, and overwrite it in the driver program, the modification can have stable impact on all transformations/actions defined BEFORE the overwrite but was executed lazily AFTER the overwrite: val a = sc.parallelize(1 to 10) var broadcasted = sc.broadcast(broad) val b = a.map(_ + broadcasted.value) // b.persist() for (line - b.collect()) { print(line) } println(\n===) broadcasted = sc.broadcast(cast) for (line - b.collect()) { print(line) } the result is: 1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad === 1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast Of course, if you persist b before overwriting it will still get the non-surprising result (both are 10broad... because they are persisted). This can be useful sometimes but may cause confusion at other times (people can no longer add persist at will just for backup because it may change the result). So far I've found no documentation supporting this feature. So can some one confirm that its a feature craftly designed? Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use Spark Streaming from an HTTP api?
I want to send an HTTP request (specifically to OpenTSDB) to get data. I've been looking at the StreamingContext api and don't seem to see any methods that can connect to this. Has anyone tried connecting Spark Streaming to a server via HTTP requests before? How have you done it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Thanks, Zhan for the follow up. But, do you know how I am supposed to set that table name on the jobConf? I don't have access to that object from my client driver? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-connecting-from-Spark-to-a-Hive-table-backed-by-HBase-tp12284p12331.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-submit with HA YARN
Hello, I have an HA enabled YARN cluster with two resource mangers. When submitting jobs via “spark-submit —master yarn-cluster”. It appears that the driver is looking explicitly for the yarn.resourcemanager.address” property rather than round robin-ing through the resource managers via the “yarn.client.failover-proxy-provider” property set to “org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider” If I explicitly set the “yarn.resourcemanager.address” to the active resource manager, jobs will submit fine. Is there a manner to set “spark-submit —master yarn-cluster” to respect the failover proxy? Thanks in advance, Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
Hi John, It seems like original problem you had was that you were initializing the RabbitMQ connection on the driver, but then calling the code to write to RabbitMQ on the workers (I'm guessing, but I don't know since I didn't see your code). That's definitely a problem because the connection can't be serialized and passed onto the workers. dstream.foreachRDD(rdd = { // create connection / channel to source rdd.foreach( // tries write to rabbitMQ from the worker - there is a problem since the connection cannot be passed to the workers. element = // write using channel ) }) You then changed the code to open the connection on the workers, and write out the data once per worker. This worked, but as you saw - you are writing out the data multiple times. dstream.foreachRDD(rdd = { rdd.foreachPartition(iterator = { // Create or get a singleton instance of a connection / channel iter.foreach(element = // write using connection / channel // This works and writes, but writes once per partition. }) }) If I understand correctly, I believe what you want is to open the connection on the driver, and write to RabbitMQ from the driver as well. dstream.foreachRDD(rdd = { // Any Code you put here is executed on the driver. rdd.foreach( // Any code inside rdd.forEach is executed on the workers. ) // This code will be executed back on the driver again. // You can open the connection and write stats here. }) Does this clear things up? I hope it's very clear now when code in the streaming example executes on the driver vs. the workers. -Vida On Mon, Aug 18, 2014 at 2:20 PM, jschindler john.schind...@utexas.edu wrote: Well, it looks like I can use the .repartition(1) method to stuff everything in one partition so that gets rid of the duplicate messages I send to RabbitMQ but that seems like a bad idea perhaps. Wouldn't that hurt scalability? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
Oh sorry, just to be more clear - writing from the driver program is only safe if the amount of data you are trying to write is small enough to fit on memory in the driver program. I looked at your code, and since you are just writing a few things each time interval, this seems safe. -Vida On Mon, Aug 18, 2014 at 4:25 PM, Vida Ha v...@databricks.com wrote: Hi John, It seems like original problem you had was that you were initializing the RabbitMQ connection on the driver, but then calling the code to write to RabbitMQ on the workers (I'm guessing, but I don't know since I didn't see your code). That's definitely a problem because the connection can't be serialized and passed onto the workers. dstream.foreachRDD(rdd = { // create connection / channel to source rdd.foreach( // tries write to rabbitMQ from the worker - there is a problem since the connection cannot be passed to the workers. element = // write using channel ) }) You then changed the code to open the connection on the workers, and write out the data once per worker. This worked, but as you saw - you are writing out the data multiple times. dstream.foreachRDD(rdd = { rdd.foreachPartition(iterator = { // Create or get a singleton instance of a connection / channel iter.foreach(element = // write using connection / channel // This works and writes, but writes once per partition. }) }) If I understand correctly, I believe what you want is to open the connection on the driver, and write to RabbitMQ from the driver as well. dstream.foreachRDD(rdd = { // Any Code you put here is executed on the driver. rdd.foreach( // Any code inside rdd.forEach is executed on the workers. ) // This code will be executed back on the driver again. // You can open the connection and write stats here. }) Does this clear things up? I hope it's very clear now when code in the streaming example executes on the driver vs. the workers. -Vida On Mon, Aug 18, 2014 at 2:20 PM, jschindler john.schind...@utexas.edu wrote: Well, it looks like I can use the .repartition(1) method to stuff everything in one partition so that gets rid of the duplicate messages I send to RabbitMQ but that seems like a bad idea perhaps. Wouldn't that hurt scalability? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Processing multiple files in parallel
Hi, I have a piece of code that reads all the (csv) files in a folder. For each file, it parses each line, extracts the first 2 elements from each row of the file, groups the tuple by the key and finally outputs the number of unique values for each key. val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) val user_time = sc.union(sc.textFile(/directory/*))// union of all files in the directory .map(line = { val fields = line.split(,) (fields(1), fields(0)) // extract first 2 elements }) .groupByKey // group by timestamp .map(g= (g._1, g._2.toSet.size)) // get the number of unique ids per timestamp I have a lot of files in the directory (several hundreds). The program takes a long time. I am not sure if the union operation is preventing the files from being processed in parallel. Is there a better way to parallelize the above code ? For example, the first two operations (reading each file and extracting the first 2 columns from each file) can be done in parallel, but I am not sure if that is how Spark schedules the above code. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Data loss - Spark streaming and network receiver
We are prototyping an application with Spark streaming and Kinesis. We use kinesis to accept incoming txn data, and then process them using spark streaming. So far we really liked both technologies, and we saw both technologies are getting mature rapidly. We are almost settled to use these two technologies, but we are a little scary by the paragraph in the programming guide. For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. We also worry about our driver program (or the whole cluster) dies because of a bug in the application, the above logic will allow us to resume from our last checkpoint. Is there any best practices out there for this issue? I suppose many folks are using spark streaming with network receivers, any suggestion is welcomed. 2) Write kinesis data to s3 first, then either use it as a backup or read from s3 in spark streaming. This is the safest approach but with a performance/latency penalty. On the other hand, we may have to write data to s3 anyway since Kinesis only stores up to 24 hours data just in case we had a bad day in our server infrastructure. 3) Wait for this issue to be addressed in spark streaming. I found this ticket https://issues.apache.org/jira/browse/SPARK-1647, but it is not resolved yet. Thanks, Wei
Re: How to use Spark Streaming from an HTTP api?
You need to create a custom receiver that submits the HTTP requests then deserializes the data and pushes it into the Streaming context. See here for an example: http://spark.apache.org/docs/latest/streaming-custom-receivers.html On 8/18/14, 6:20 PM, bumble123 tc1...@att.com wrote: I want to send an HTTP request (specifically to OpenTSDB) to get data. I've been looking at the StreamingContext api and don't seem to see any methods that can connect to this. Has anyone tried connecting Spark Streaming to a server via HTTP requests before? How have you done it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Strea ming-from-an-HTTP-api-tp12330.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does HiveContext support Parquet?
First the JAR needs to be deployed using the ‹jars argument. Then in your HQL code you need to use the DeprecatedParquetInputFormat and DeprecatedParquetOutputFormat as described here https://cwiki.apache.org/confluence/display/Hive/Parquet#Parquet-Hive0.10-0 .12 This is because SparkSQL is based on Hive 0.12. That¹s what¹s worked for me. Thanks, Silvio On 8/18/14, 3:14 PM, lyc yanchen@huawei.com wrote: I followed your instructions to try to load data as parquet format through hiveContext but failed. Do you happen to know my uncorrectness in the following steps? The steps I am following is like: 1. download parquet-hive-bundle-1.5.0.jar 2. revise hive-site.xml including this: property namehive.jar.directory/name value/home/hduser/hive/lib/parquet-hive-bundle-1.5.0.jar/value description This is the location hive in tez mode will look for to find a site wide installed hive instance. If not set, the directory under hive.user.install.directory corresponding to current user name will be used. /description /property 3. copy hive-site.xml to all nodes. 4. start spark-shell, then try to create table: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ hql(create table part (P_PARTKEY INT, P_NAME STRING, P_MFGR STRING, P_BRAND STRING, P_TYPE STRING, P_SIZE INT, P_CONTAINER STRING, P_RETAILPRICE DOUBLE, P_COMMENT STRING) STORED AS PARQUET) Then I got this error: 14/08/18 19:09:00 ERROR Driver: FAILED: SemanticException Unrecognized file format in STORED AS clause: PARQUET org.apache.hadoop.hive.ql.parse.SemanticException: Unrecognized file format in STORED AS clause: PARQUET at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.handleGenericFileForm at(BaseSemanticAnalyzer.java:569) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(Semant icAnalyzer.java:8968) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticA nalyzer.java:8313) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticA nalyzer.java:284) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(Hive Context.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.sca la:247) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90) at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line44.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line44.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line44.$read$$iwC$$iwC$$iwC.init(console:27) at $line44.$read$$iwC$$iwC.init(console:29) at $line44.$read$$iwC.init(console:31) at $line44.$read.init(console:33) at $line44.$read$.init(console:37) at $line44.$read$.clinit(console) at $line44.$eval$.init(console:7) at $line44.$eval$.clinit(console) at $line44.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm pl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:84 1) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoo p.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala :884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala :884) at
Re: Segmented fold count
fil wrote - Python functions like groupCount; these get reflected from their Python AST and converted into a Spark DAG? Presumably if I try and do something non-convertible this transformation process will throw an error? In other words this runs in the JVM. Further to this - it seems that Python does run on each node in the cluster, meaning it runs outside the JVM. Presumably this means that writing this in Scala would be far more performant. Could I write groupCount() in Scala, and then use it from Pyspark? Care to supply an example, I'm finding them hard to find :) fil wrote - I had considered that partitions were batches of distributable work, and generally large. Presumably the above is OK with small groups (eg. average size 10) - this won't kill performance? I'm still a bit confused about the dual meaning of partition: work segmentation, and key groups. Care to clarify anyone - when are partitions used to describe chunks of data for different nodes in the cluster (ie. large), and when are they used to describe groups of items in data (ie. small).. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data loss - Spark streaming and network receiver
Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
sqlContext.parquetFile(path) fails if path is a file but succeeds if a directory
I'm using CDH 5.1 with spark 1.0. When I try to run Spark SQL following the Programming Guide val parquetFile = sqlContext.parquetFile(path) If the path is a file, it throws an exception: Exception in thread main java.lang.IllegalArgumentException: Expected hdfs://*/file.parquet for be a directory with Parquet files/metadata at org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetRelation.scala:301) at org.apache.spark.sql.parquet.ParquetRelation.parquetSchema(ParquetRelation.scala:62) at org.apache.spark.sql.parquet.ParquetRelation.init(ParquetRelation.scala:69) at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:98) However, if the path is the parent directory of the file, it succeeds. Note: there is only one file in that directory. I look into the source, /** * Try to read Parquet metadata at the given Path. We first see if there is a summary file * in the parent directory. If so, this is used. Else we read the actual footer at the given * location. * @param origPath The path at which we expect one (or more) Parquet files. * @return The `ParquetMetadata` containing among other things the schema. */ def readMetaData(origPath: Path): ParquetMetadata It doesn't require a directory, but it did throw an exception if (!fs.getFileStatus(path).isDir) { throw new IllegalArgumentException( sExpected $path for be a directory with Parquet files/metadata) } It seems odd to me, can anybody explains why, and how to read a file, not a directory?
Re: Segmented fold count
On Mon, Aug 18, 2014 at 7:41 PM, fil f...@pobox.com wrote: fil wrote - Python functions like groupCount; these get reflected from their Python AST and converted into a Spark DAG? Presumably if I try and do something non-convertible this transformation process will throw an error? In other words this runs in the JVM. Further to this - it seems that Python does run on each node in the cluster, meaning it runs outside the JVM. Presumably this means that writing this in Scala would be far more performant. Could I write groupCount() in Scala, and then use it from Pyspark? Care to supply an example, I'm finding them hard to find :) It's doable, but not so convenient. If you really care about the performance difference, you should write your program in Scala. fil wrote - I had considered that partitions were batches of distributable work, and generally large. Presumably the above is OK with small groups (eg. average size 10) - this won't kill performance? I'm still a bit confused about the dual meaning of partition: work segmentation, and key groups. Care to clarify anyone - when are partitions used to describe chunks of data for different nodes in the cluster (ie. large), and when are they used to describe groups of items in data (ie. small).. An partition means a chunk of data in RDD, the computation on a partition is a task, which will be sent to a node in the cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark - Identifying and skipping processed data in hdfs
Hi, Mine data source stores the incoming data every 10 second to hdfs. The naming convention save-timestamp.csv (see below) drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396065000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839607.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396075000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839608.csv I would like to periodically (every 5min) read the files and process them. is there a good example out there how to implement this? How do I know what part of the data I have already processed? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-Identifying-and-skipping-processed-data-in-hdfs-tp12347.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bug or feature? Overwrite broadcasted variables.
Yeah, Thanks a lot. I know for people understanding lazy execution this seems straightforward. But for those who don't it may become a liability. I've only tested its stability on a small example (which seems stable), hopefully it's not a serendipity. Can a committer confirm this? Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Data loss - Spark streaming and network receiver
I think Currently Spark Streaming lack a data acknowledging mechanism when data is stored and replicated in BlockManager, so potentially data will be lost even pulled into Kafka, say if data is stored just in BlockGenerator not BM, while in the meantime Kafka itself commit the consumer offset, also at this point node is failed, from Kafka’s point this part of data is feed into Spark Streaming but actually this data is not yet processed, so potentially this part of data will never be processed again, unless you read the whole partition again. To solve this potential data loss problem, Spark Streaming needs to offer a data acknowledging mechanism, so custom Receiver can use this acknowledgement to do checkpoint or recovery, like Storm. Besides, driver failure is another story need to be carefully considered. So currently it is hard to make sure no data loss in Spark Streaming, still need to improve at some points ☺. Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Tuesday, August 19, 2014 10:47 AM To: Wei Liu Cc: user Subject: Re: Data loss - Spark streaming and network receiver Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.commailto:wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
Re: Data loss - Spark streaming and network receiver
Dear All, Recently I have written a Spark Kafka Consumer to solve this problem. Even we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer and consumer code has no handle to offset management. The below code solves this problem, and this has is being tested in our Spark Cluster and this working fine as of now. https://github.com/dibbhatt/kafka-spark-consumer This is Low Level Kafka Consumer using Kafka Simple Consumer API. Please have a look at it and let me know your opinion. This has been written to eliminate the Data loss by committing the offset after it is written to BM. Also existing HighLevel KafkaUtils does not have any feature to control Data Flow, and is gives Out Of Memory error is there is too much backlogs in Kafka. This consumer solves this problem as well. And this code has been modified from earlier Storm Kafka consumer code and it has lot of other features like recovery from Kafka node failures, ZK failures, recover from Offset errors etc. Regards, Dibyendu On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com wrote: I think Currently Spark Streaming lack a data acknowledging mechanism when data is stored and replicated in BlockManager, so potentially data will be lost even pulled into Kafka, say if data is stored just in BlockGenerator not BM, while in the meantime Kafka itself commit the consumer offset, also at this point node is failed, from Kafka’s point this part of data is feed into Spark Streaming but actually this data is not yet processed, so potentially this part of data will never be processed again, unless you read the whole partition again. To solve this potential data loss problem, Spark Streaming needs to offer a data acknowledging mechanism, so custom Receiver can use this acknowledgement to do checkpoint or recovery, like Storm. Besides, driver failure is another story need to be carefully considered. So currently it is hard to make sure no data loss in Spark Streaming, still need to improve at some points J. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp] *Sent:* Tuesday, August 19, 2014 10:47 AM *To:* Wei Liu *Cc:* user *Subject:* Re: Data loss - Spark streaming and network receiver Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
Re: Does anyone have a stand alone spark instance running on Windows
OK I tried your build - First you need to put spt in C:\sbt Then you get Microsoft Windows [Version 6.2.9200] (c) 2012 Microsoft Corporation. All rights reserved. e:\which java /cygdrive/c/Program Files/Java/jdk1.6.0_25/bin/java e:\java -version java version 1.6.0_25 Java(TM) SE Runtime Environment (build 1.6.0_25-b06) e:\sparksbt_opt.bat e:\sparkset SCRIPT_DIR=C:\sbt\ e:\sparkjava -Xms512m -Xmx2g -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=128m -jar C:\sbt\sbt-launch.jar [ERROR] Terminal initialization failed; falling back to unsupported java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected at jline.TerminalFactory.create(TerminalFactory.java:101) at jline.TerminalFactory.get(TerminalFactory.java:159) at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:86) at sbt.ConsoleLogger$.init(ConsoleLogger.scala:80) at sbt.ConsoleLogger$.clinit(ConsoleLogger.scala) at sbt.GlobalLogging$.initial(GlobalLogging.scala:40) at sbt.StandardMain$.initialGlobalLogging(Main.scala:64) at sbt.StandardMain$.initialState(Main.scala:73) at sbt.xMain.run(Main.scala:29) at xsbt.boot.Launch$.run(Launch.scala:55) at xsbt.boot.Launch$$anonfun$explicit$1.apply(Launch.scala:45) at xsbt.boot.Launch$.launch(Launch.scala:69) at xsbt.boot.Launch$.apply(Launch.scala:16) at xsbt.boot.Boot$.runImpl(Boot.scala:31) at xsbt.boot.Boot$.main(Boot.scala:20) at xsbt.boot.Boot.main(Boot.scala) java.lang.IncompatibleClassChangeError: JLine incompatibility detected. Check that the sbt launcher is version 0.13.x or later. at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:97) at sbt.ConsoleLogger$.init(ConsoleLogger.scala:80) at sbt.ConsoleLogger$.clinit(ConsoleLogger.scala) at sbt.GlobalLogging$.initial(GlobalLogging.scala:40) at sbt.StandardMain$.initialGlobalLogging(Main.scala:64) at sbt.StandardMain$.initialState(Main.scala:73) at sbt.xMain.run(Main.scala:29) at xsbt.boot.Launch$.run(Launch.scala:55) at xsbt.boot.Launch$$anonfun$explicit$1.apply(Launch.scala:45) at xsbt.boot.Launch$.launch(Launch.scala:69) at xsbt.boot.Launch$.apply(Launch.scala:16) at xsbt.boot.Boot$.runImpl(Boot.scala:31) at xsbt.boot.Boot$.main(Boot.scala:20) at xsbt.boot.Boot.main(Boot.scala) Error during sbt execution: java.lang.IncompatibleClassChangeError: JLine incompatibility detected. Check that the sbt launcher is version 0.13.x or later. I believe my version of sbt is -.0.13 Finally even if I could build Spark I still don't see how to launch a server On Sat, Aug 16, 2014 at 7:33 PM, Manu Suryavansh suryavanshi.m...@gmail.com wrote: Hi, I have built spark-1.0.0 on Windows using Java 7/8 and I have been able to run several examples - here are my notes - http://ml-nlp-ir.blogspot.com/2014/04/building-spark-on-windows-and-cloudera.html on how to build from source and run examples in spark shell. Regards, Manu On Sat, Aug 16, 2014 at 12:14 PM, Steve Lewis lordjoe2...@gmail.com wrote: I want to look at porting a Hadoop problem to Spark - eventually I want to run on a Hadoop 2.0 cluster but while I am learning and porting I want to run small problems in my windows box. I installed scala and sbt. I download Spark and in the spark directory can say mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package which succeeds I tried sbt/sbt assembly which fails with errors In the documentation https://spark.apache.org/docs/latest/spark-standalone.htmlit says *Note:* The launch scripts do not currently support Windows. To run a Spark cluster on Windows, start the master and workers by hand. with no indication of how to do this. I can build and run samples (say JavaWordCount) to the point where they fail because a master cannot be found (none is running) I want to know how to get a spark master and a slave or two running on my windows box so I can look at the samples and start playing with Spark Does anyone have a windows instance running?? Please DON'T SAY I SHOULD RUN LINUX! if it is supposed to work on windows someone should have tested it and be willing to state how. -- Manu Suryavansh -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Data loss - Spark streaming and network receiver
Thank you all for responding to my question. I am pleasantly surprised by this many prompt responses I got. It shows the strength of the spark community. Kafka is still an option for us, I will check out the link provided by Dibyendu. Meanwhile if someone out there already figured this out with Kinesis, please keep your suggestion coming. Thanks. Thanks, Wei On Mon, Aug 18, 2014 at 9:31 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Dear All, Recently I have written a Spark Kafka Consumer to solve this problem. Even we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer and consumer code has no handle to offset management. The below code solves this problem, and this has is being tested in our Spark Cluster and this working fine as of now. https://github.com/dibbhatt/kafka-spark-consumer This is Low Level Kafka Consumer using Kafka Simple Consumer API. Please have a look at it and let me know your opinion. This has been written to eliminate the Data loss by committing the offset after it is written to BM. Also existing HighLevel KafkaUtils does not have any feature to control Data Flow, and is gives Out Of Memory error is there is too much backlogs in Kafka. This consumer solves this problem as well. And this code has been modified from earlier Storm Kafka consumer code and it has lot of other features like recovery from Kafka node failures, ZK failures, recover from Offset errors etc. Regards, Dibyendu On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com wrote: I think Currently Spark Streaming lack a data acknowledging mechanism when data is stored and replicated in BlockManager, so potentially data will be lost even pulled into Kafka, say if data is stored just in BlockGenerator not BM, while in the meantime Kafka itself commit the consumer offset, also at this point node is failed, from Kafka’s point this part of data is feed into Spark Streaming but actually this data is not yet processed, so potentially this part of data will never be processed again, unless you read the whole partition again. To solve this potential data loss problem, Spark Streaming needs to offer a data acknowledging mechanism, so custom Receiver can use this acknowledgement to do checkpoint or recovery, like Storm. Besides, driver failure is another story need to be carefully considered. So currently it is hard to make sure no data loss in Spark Streaming, still need to improve at some points J. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp] *Sent:* Tuesday, August 19, 2014 10:47 AM *To:* Wei Liu *Cc:* user *Subject:* Re: Data loss - Spark streaming and network receiver Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
Re: Working with many RDDs in parallel?
Hmm I thought as much. I am using Cassandra with the Spark connector. What I really need is a RDD created from a query against Cassandra of the form where partition_key = :id where :id is taken from a list. Some grouping of the ids would be a way to partition this. On Mon, Aug 18, 2014 at 3:42 PM, Sean Owen so...@cloudera.com wrote: You won't be able to use RDDs inside of RDD operation. I imagine your immediate problem is that the code you've elided references 'sc' and that gets referenced by the PairFunction and serialized, but it can't be. If you want to play it this way, parallelize across roots in Java. That is just use an ExecutorService to launch a bunch of operations on RDDs in parallel. There's no reason you can't do that, although I suppose there are upper limits as to what makes sense on your cluster. 1000 RDD count()s at once isn't a good idea for example. It may be the case that you don't really need a bunch of RDDs at all, but can operate on an RDD of pairs of Strings (roots) and something-elses, all at once. On Mon, Aug 18, 2014 at 2:31 PM, David Tinker david.tin...@gmail.com wrote: Hi All. I need to create a lot of RDDs starting from a set of roots and count the rows in each. Something like this: final JavaSparkContext sc = new JavaSparkContext(conf); ListString roots = ... MapString, Object res = sc.parallelize(roots).mapToPair(new PairFunctionString, String, Long(){ public Tuple2String, Long call(String root) throws Exception { ... create RDD based on root from sc somehow ... return new Tuple2String, Long(root, rdd.count()) } }).countByKey() This fails with a message about JavaSparkContext not being serializable. Is there a way to get at the content inside of the map function or should I be doing something else entirely? Thanks David -- http://qdb.io/ Persistent Message Queues With Replay and #RabbitMQ Integration