Create DataFrame from textFile with unknown columns
Assuming there is a text file with unknown number of columns, how one would create a data frame? I have followed the example in Spark Docs where one first creates a RDD of Rows, but it seems that you have to know exact number of columns in file and can't to just this: val rowRDD = sc.textFile(path/file).map(_.split( |\\,)).map(_.org.apache.spark.sql.Row(_)) The above will work if I'd do ...Row(_(0), _(1), ...) but the number of columns is unknown. Also assuming that one has RDD[Row], why .toDF() is not defined on this RDD type? Is it the only way to call .createDataFrame(...) method to create a DF out of RDD[Row]? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-DataFrame-from-textFile-with-unknown-columns-tp22386.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NoSuchMethodException KafkaUtils.
Customized spark-streaming-kafka_2.10-1.1.0.jar. Included a new method in kafkaUtils class to handle byte array format. That helped. - Thanks, Yamini -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodException-KafkaUtils-tp17142p22384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: newAPIHadoopRDD Mutiple scan result return from Hbase
bq. HBase scan operation like scan StartROW and EndROW in RDD? I don't think RDD supports concept of start row and end row. In HBase, please take a look at the following methods of Scan: public Scan setStartRow(byte [] startRow) { public Scan setStopRow(byte [] stopRow) { Cheers On Sun, Apr 5, 2015 at 2:35 PM, Jeetendra Gangele gangele...@gmail.com wrote: I have 2GB hbase table where this data is store in the form on key and value(only one column per key) and key also unique What I thinking to load the complete hbase table into RDD and then do the operation like scan and all in RDD rather than Hbase. Can I do HBase scan operation like scan StartROW and EndROW in RDD? Firrst steps in my job will be to load the complete data into RDD. On 6 April 2015 at 02:45, Ted Yu yuzhih...@gmail.com wrote: You do need to apply the patch since 0.96 doesn't have this feature. For JavaSparkContext.newAPIHadoopRDD, can you check region server metrics to see where the overhead might be (compared to creating scan and firing query using native client) ? Thanks On Sun, Apr 5, 2015 at 2:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thats true I checked the MultiRowRangeFilter and its serving my need. do I need to apply the patch? for this since I am using 0.96 hbase version. Also I have checked when I used JavaSparkContext.newAPIHadoopRDD its slow compare to creating scan and firing query, is there any reason? On 6 April 2015 at 01:57, Ted Yu yuzhih...@gmail.com wrote: Looks like MultiRowRangeFilter would serve your need. See HBASE-11144. HBase 1.1 would be released in May. You can also backport it to the HBase release you're using. On Sat, Apr 4, 2015 at 8:45 AM, Jeetendra Gangele gangele...@gmail.com wrote: Here is my conf object passing first parameter of API. but here I want to pass multiple scan means i have 4 criteria for STRAT ROW and STOROW in same table. by using below code i can get result for one STARTROW and ENDROW. Configuration conf = DBConfiguration.getConf(); // int scannerTimeout = (int) conf.getLong( // HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, -1); // System.out.println(lease timeout on server is+scannerTimeout); int scannerTimeout = (int) conf.getLong( hbase.client.scanner.timeout.period, -1); // conf.setLong(hbase.client.scanner.timeout.period, 6L); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); Scan scan = new Scan(); scan.addFamily(FAMILY); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); filterList.addFilter(new KeyOnlyFilter()); filterList.addFilter(new FirstKeyOnlyFilter()); scan.setFilter(filterList); scan.setCacheBlocks(false); scan.setCaching(10); scan.setBatch(1000); scan.setSmall(false); conf.set(TableInputFormat.SCAN, DatabaseUtils.convertScanToString(scan)); return conf; On 4 April 2015 at 20:54, Jeetendra Gangele gangele...@gmail.com wrote: Hi All, Can we get the result of the multiple scan from JavaSparkContext.newAPIHadoopRDD from Hbase. This method first parameter take configuration object where I have added filter. but how Can I query multiple scan from same table calling this API only once? regards jeetendra
Re: Need help with ALS Recommendation code
Could you try `sbt package` or `sbt compile` and see whether there are errors? It seems that you haven't reached the ALS code yet. -Xiangrui On Sat, Apr 4, 2015 at 5:06 AM, Phani Yadavilli -X (pyadavil) pyada...@cisco.com wrote: Hi , I am trying to run the following command in the Movie Recommendation example provided by the ampcamp tutorial Command: sbt package run /movielens/medium Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread run-main-0 java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 0 s, completed Apr 4, 2015 12:00:18 PM I am unable to identify the error code.Can someone help me on this. Regards Phani Kumar - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: newAPIHadoopRDD Mutiple scan result return from Hbase
I am already using STRATROW and ENDROW in Hbase from newAPIHadoopRDD. Can I do similar with RDD?.lets say use Filter in RDD to get only those records which matches the same Criteria mentioned in STARTROW and Stop ROW.will it much faster than Hbase querying? On 6 April 2015 at 03:15, Ted Yu yuzhih...@gmail.com wrote: bq. HBase scan operation like scan StartROW and EndROW in RDD? I don't think RDD supports concept of start row and end row. In HBase, please take a look at the following methods of Scan: public Scan setStartRow(byte [] startRow) { public Scan setStopRow(byte [] stopRow) { Cheers On Sun, Apr 5, 2015 at 2:35 PM, Jeetendra Gangele gangele...@gmail.com wrote: I have 2GB hbase table where this data is store in the form on key and value(only one column per key) and key also unique What I thinking to load the complete hbase table into RDD and then do the operation like scan and all in RDD rather than Hbase. Can I do HBase scan operation like scan StartROW and EndROW in RDD? Firrst steps in my job will be to load the complete data into RDD. On 6 April 2015 at 02:45, Ted Yu yuzhih...@gmail.com wrote: You do need to apply the patch since 0.96 doesn't have this feature. For JavaSparkContext.newAPIHadoopRDD, can you check region server metrics to see where the overhead might be (compared to creating scan and firing query using native client) ? Thanks On Sun, Apr 5, 2015 at 2:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thats true I checked the MultiRowRangeFilter and its serving my need. do I need to apply the patch? for this since I am using 0.96 hbase version. Also I have checked when I used JavaSparkContext.newAPIHadoopRDD its slow compare to creating scan and firing query, is there any reason? On 6 April 2015 at 01:57, Ted Yu yuzhih...@gmail.com wrote: Looks like MultiRowRangeFilter would serve your need. See HBASE-11144. HBase 1.1 would be released in May. You can also backport it to the HBase release you're using. On Sat, Apr 4, 2015 at 8:45 AM, Jeetendra Gangele gangele...@gmail.com wrote: Here is my conf object passing first parameter of API. but here I want to pass multiple scan means i have 4 criteria for STRAT ROW and STOROW in same table. by using below code i can get result for one STARTROW and ENDROW. Configuration conf = DBConfiguration.getConf(); // int scannerTimeout = (int) conf.getLong( // HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, -1); // System.out.println(lease timeout on server is+scannerTimeout); int scannerTimeout = (int) conf.getLong( hbase.client.scanner.timeout.period, -1); // conf.setLong(hbase.client.scanner.timeout.period, 6L); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); Scan scan = new Scan(); scan.addFamily(FAMILY); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); filterList.addFilter(new KeyOnlyFilter()); filterList.addFilter(new FirstKeyOnlyFilter()); scan.setFilter(filterList); scan.setCacheBlocks(false); scan.setCaching(10); scan.setBatch(1000); scan.setSmall(false); conf.set(TableInputFormat.SCAN, DatabaseUtils.convertScanToString(scan)); return conf; On 4 April 2015 at 20:54, Jeetendra Gangele gangele...@gmail.com wrote: Hi All, Can we get the result of the multiple scan from JavaSparkContext.newAPIHadoopRDD from Hbase. This method first parameter take configuration object where I have added filter. but how Can I query multiple scan from same table calling this API only once? regards jeetendra
Re: Re: About Waiting batches on the spark streaming UI
Thanks Tathagata for the explanation! bit1...@163.com From: Tathagata Das Date: 2015-04-04 01:28 To: Ted Yu CC: bit1129; user Subject: Re: About Waiting batches on the spark streaming UI Maybe that should be marked as waiting as well. Will keep that in mind. We plan to update the ui soon, so will keep that in mind. On Apr 3, 2015 10:12 AM, Ted Yu yuzhih...@gmail.com wrote: Maybe add another stat for batches waiting in the job queue ? Cheers On Fri, Apr 3, 2015 at 10:01 AM, Tathagata Das t...@databricks.com wrote: Very good question! This is because the current code is written such that the ui considers a batch as waiting only when it has actually started being processed. Thats batched waiting in the job queue is not considered in the calculation. It is arguable that it may be more intuitive to count that in the waiting as well. On Apr 3, 2015 12:59 AM, bit1...@163.com bit1...@163.com wrote: I copied the following from the spark streaming UI, I don't know why the Waiting batches is 1, my understanding is that it should be 72. Following is my understanding: 1. Total time is 1minute 35 seconds=95 seconds 2. Batch interval is 1 second, so, 95 batches are generated in 95 seconds. 3. Processed batches are 23(Correct, because in my processing code, it does nothing but sleep 4 seconds) 4. Then the waiting batches should be 95-23=72 Started at: Fri Apr 03 15:17:47 CST 2015 Time since start: 1 minute 35 seconds Network receivers: 1 Batch interval: 1 second Processed batches: 23 Waiting batches: 1 Received records: 0 Processed records: 0 bit1...@163.com
Re: newAPIHadoopRDD Mutiple scan result return from Hbase
You do need to apply the patch since 0.96 doesn't have this feature. For JavaSparkContext.newAPIHadoopRDD, can you check region server metrics to see where the overhead might be (compared to creating scan and firing query using native client) ? Thanks On Sun, Apr 5, 2015 at 2:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thats true I checked the MultiRowRangeFilter and its serving my need. do I need to apply the patch? for this since I am using 0.96 hbase version. Also I have checked when I used JavaSparkContext.newAPIHadoopRDD its slow compare to creating scan and firing query, is there any reason? On 6 April 2015 at 01:57, Ted Yu yuzhih...@gmail.com wrote: Looks like MultiRowRangeFilter would serve your need. See HBASE-11144. HBase 1.1 would be released in May. You can also backport it to the HBase release you're using. On Sat, Apr 4, 2015 at 8:45 AM, Jeetendra Gangele gangele...@gmail.com wrote: Here is my conf object passing first parameter of API. but here I want to pass multiple scan means i have 4 criteria for STRAT ROW and STOROW in same table. by using below code i can get result for one STARTROW and ENDROW. Configuration conf = DBConfiguration.getConf(); // int scannerTimeout = (int) conf.getLong( // HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, -1); // System.out.println(lease timeout on server is+scannerTimeout); int scannerTimeout = (int) conf.getLong( hbase.client.scanner.timeout.period, -1); // conf.setLong(hbase.client.scanner.timeout.period, 6L); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); Scan scan = new Scan(); scan.addFamily(FAMILY); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); filterList.addFilter(new KeyOnlyFilter()); filterList.addFilter(new FirstKeyOnlyFilter()); scan.setFilter(filterList); scan.setCacheBlocks(false); scan.setCaching(10); scan.setBatch(1000); scan.setSmall(false); conf.set(TableInputFormat.SCAN, DatabaseUtils.convertScanToString(scan)); return conf; On 4 April 2015 at 20:54, Jeetendra Gangele gangele...@gmail.com wrote: Hi All, Can we get the result of the multiple scan from JavaSparkContext.newAPIHadoopRDD from Hbase. This method first parameter take configuration object where I have added filter. but how Can I query multiple scan from same table calling this API only once? regards jeetendra
Re: newAPIHadoopRDD Mutiple scan result return from Hbase
Sure I will check. On 6 April 2015 at 02:45, Ted Yu yuzhih...@gmail.com wrote: You do need to apply the patch since 0.96 doesn't have this feature. For JavaSparkContext.newAPIHadoopRDD, can you check region server metrics to see where the overhead might be (compared to creating scan and firing query using native client) ? Thanks On Sun, Apr 5, 2015 at 2:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thats true I checked the MultiRowRangeFilter and its serving my need. do I need to apply the patch? for this since I am using 0.96 hbase version. Also I have checked when I used JavaSparkContext.newAPIHadoopRDD its slow compare to creating scan and firing query, is there any reason? On 6 April 2015 at 01:57, Ted Yu yuzhih...@gmail.com wrote: Looks like MultiRowRangeFilter would serve your need. See HBASE-11144. HBase 1.1 would be released in May. You can also backport it to the HBase release you're using. On Sat, Apr 4, 2015 at 8:45 AM, Jeetendra Gangele gangele...@gmail.com wrote: Here is my conf object passing first parameter of API. but here I want to pass multiple scan means i have 4 criteria for STRAT ROW and STOROW in same table. by using below code i can get result for one STARTROW and ENDROW. Configuration conf = DBConfiguration.getConf(); // int scannerTimeout = (int) conf.getLong( // HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, -1); // System.out.println(lease timeout on server is+scannerTimeout); int scannerTimeout = (int) conf.getLong( hbase.client.scanner.timeout.period, -1); // conf.setLong(hbase.client.scanner.timeout.period, 6L); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); Scan scan = new Scan(); scan.addFamily(FAMILY); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); filterList.addFilter(new KeyOnlyFilter()); filterList.addFilter(new FirstKeyOnlyFilter()); scan.setFilter(filterList); scan.setCacheBlocks(false); scan.setCaching(10); scan.setBatch(1000); scan.setSmall(false); conf.set(TableInputFormat.SCAN, DatabaseUtils.convertScanToString(scan)); return conf; On 4 April 2015 at 20:54, Jeetendra Gangele gangele...@gmail.com wrote: Hi All, Can we get the result of the multiple scan from JavaSparkContext.newAPIHadoopRDD from Hbase. This method first parameter take configuration object where I have added filter. but how Can I query multiple scan from same table calling this API only once? regards jeetendra
RE: Need help with ALS Recommendation code
Hi Xiangrui, Thank you for the response. I tried sbt package and sbt compile both the commands give me success result sbt compile [info] Set current project to machine-learning (in build file:/opt/mapr/spark/spark-1.2.1/SparkTraining/machine-learning/) [info] Updating {file:/opt/mapr/spark/spark-1.2.1/SparkTraining/machine-learning/}machine-learning... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [success] Total time: 1 s, completed Apr 6, 2015 5:14:43 AM sbt package [info] Set current project to machine-learning (in build file:/opt/mapr/spark/spark-1.2.1/SparkTraining/machine-learning/) [success] Total time: 1 s, completed Apr 6, 2015 5:15:04 AM How do I proceed from here. Regards Phani Kumar -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Monday, April 06, 2015 9:50 AM To: Phani Yadavilli -X (pyadavil) Cc: user@spark.apache.org Subject: Re: Need help with ALS Recommendation code Could you try `sbt package` or `sbt compile` and see whether there are errors? It seems that you haven't reached the ALS code yet. -Xiangrui On Sat, Apr 4, 2015 at 5:06 AM, Phani Yadavilli -X (pyadavil) pyada...@cisco.com wrote: Hi , I am trying to run the following command in the Movie Recommendation example provided by the ampcamp tutorial Command: sbt package run /movielens/medium Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread run-main-0 java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 0 s, completed Apr 4, 2015 12:00:18 PM I am unable to identify the error code.Can someone help me on this. Regards Phani Kumar - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: conversion from java collection type to scala JavaRDDObject
The runtime attempts to serialize everything required by records, and also any lambdas/closures you use. Small, simple types are less likely to run into this problem. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, Apr 5, 2015 at 4:21 PM, Jeetendra Gangele gangele...@gmail.com wrote: You are right I have class called VendorRecord which is not serializable also this class object have many sub classed(may be 30 or more).Do I need to recursively serialize all? On 4 April 2015 at 18:14, Dean Wampler deanwamp...@gmail.com wrote: Without the rest of your code, it's hard to know what might be unserializable. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sat, Apr 4, 2015 at 7:56 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi I have tried with parallelize but i got the below exception java.io.NotSerializableException: pacific.dr.VendorRecord Here is my code ListVendorRecord vendorRecords=blockingKeys.getMatchingRecordsWithscan(matchKeysOutput); JavaRDDVendorRecord lines = sc.parallelize(vendorRecords) On 2 April 2015 at 21:11, Dean Wampler deanwamp...@gmail.com wrote: Use JavaSparkContext.parallelize. http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All Is there an way to make the JavaRDDObject from existing java collection type ListObject? I know this can be done using scala , but i am looking how to do this using java. Regards Jeetendra
Add row IDs column to data frame
What would be the most efficient neat method to add a column with row ids to dataframe? I can think of something as below, but it completes with errors (at line 3), and anyways doesn't look like the best route possible: var dataDF = sc.textFile(path/file).toDF() val rowDF = sc.parallelize(1 to dataDF.count().toInt).toDF(ID) dataDF = dataDF.withColumn(ID, rowDF(ID)) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add row IDs column to data frame
Sorry, it should be toDF(text, id). On Sun, Apr 5, 2015 at 9:21 PM, Xiangrui Meng men...@gmail.com wrote: Try: sc.textFile(path/file).zipWithIndex().toDF(id, text) -Xiangrui On Sun, Apr 5, 2015 at 7:50 PM, olegshirokikh o...@solver.com wrote: What would be the most efficient neat method to add a column with row ids to dataframe? I can think of something as below, but it completes with errors (at line 3), and anyways doesn't look like the best route possible: var dataDF = sc.textFile(path/file).toDF() val rowDF = sc.parallelize(1 to dataDF.count().toInt).toDF(ID) dataDF = dataDF.withColumn(ID, rowDF(ID)) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add row IDs column to data frame
Try: sc.textFile(path/file).zipWithIndex().toDF(id, text) -Xiangrui On Sun, Apr 5, 2015 at 7:50 PM, olegshirokikh o...@solver.com wrote: What would be the most efficient neat method to add a column with row ids to dataframe? I can think of something as below, but it completes with errors (at line 3), and anyways doesn't look like the best route possible: var dataDF = sc.textFile(path/file).toDF() val rowDF = sc.parallelize(1 to dataDF.count().toInt).toDF(ID) dataDF = dataDF.withColumn(ID, rowDF(ID)) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming program questions
The DAG can't change. You can create many DStreams, but they have to belong to one StreamingContext. You can try these things to see. On Sun, Apr 5, 2015 at 2:13 AM, nickos168 nickos...@yahoo.com.invalid wrote: I have two questions: 1) In a Spark Streaming program, after the various DStream transformations have being setup, the ssc.start() method is called to start the computation. Can the underlying DAG change (ie. add another map or maybe a join) after ssc.start() has been called (and maybe messages have already been received/processed for some batches)? 2) In a Spark Streaming program (one process), can I have multiple DStream transformations, each series belonging to each own StreamingContext (in the same thread or in different threads)? For example: val lines_A = ssc_A.socketTextStream(..) val words_A = lines_A.flatMap(_.split( )) val wordCounts_A = words_A.map(x = (x, 1)).reduceByKey(_ + _) wordCounts_A.print() val lines_B = ssc_B.socketTextStream(..) val words_B = lines_B.flatMap(_.split( )) val wordCounts_B = words_B.map(x = (x, 1)).reduceByKey(_ + _) wordCounts_B.print() ssc_A.start() ssc_B.start() I think the answer is NO to both questions but I am wondering what is the reason for this behavior. Thanks, Nickos - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark streaming with Kafka- couldnt find KafkaUtils
Hi All, I configured Kafka cluster on a single node and I have streaming application which reads data from kafka topic using KafkaUtils. When I execute the code in local mode from the IDE, the application runs fine. But when I submit the same to spark cluster in standalone mode, I end up with the following exception: java.lang.ClassNotFoundException: org/apache/spark/streaming/kafka/KafkaUtils. I am using spark-1.2.1 version. when i checked the source files of streaming, the source files related to kafka are missing. Are these not included in spark-1.3.0 and spark-1.2.1 versions ? Have to manually include these ?? Regards, Padma Ch
Re: Spark streaming with Kafka- couldnt find KafkaUtils
How are you submitting the application? Use a standard build tool like maven or sbt to build your project, it will download all the dependency jars, when you submit your application (if you are using spark-submit, then use --jars option to add those jars which are causing classNotFoundException). If you are running as a standalone application without using spark-submit, then while creating the SparkContext, use sc.addJar() to add those dependency jars. For Kafka streaming, when you use sbt, these will be jars that are required: sc.addJar(/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar) sc.addJar(/root/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar) sc.addJar(/root/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar) sc.addJar(/root/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar) Thanks Best Regards On Sun, Apr 5, 2015 at 12:00 PM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I configured Kafka cluster on a single node and I have streaming application which reads data from kafka topic using KafkaUtils. When I execute the code in local mode from the IDE, the application runs fine. But when I submit the same to spark cluster in standalone mode, I end up with the following exception: java.lang.ClassNotFoundException: org/apache/spark/streaming/kafka/KafkaUtils. I am using spark-1.2.1 version. when i checked the source files of streaming, the source files related to kafka are missing. Are these not included in spark-1.3.0 and spark-1.2.1 versions ? Have to manually include these ?? Regards, Padma Ch
Re: Pseudo Spark Streaming ?
Hallo, Only because you receive the log files hourly it means that you have to use Spark Streaming. Spark streaming is often used if you receive new events each minute /second potentially at an irregular frequency. Of course your analysis window can be larger. I think your use case justifies standard Spark or MR. If you are not restricted to them you may check a key/value or column store, such as redis or apache cassandra. They have also some nice mechanisms for storage /performance optimal counting of unique users (hyperloglog) etc. In any case you can join the data with historical data. Best regards Le 5 avr. 2015 12:44, Bahubali Jain bahub...@gmail.com a écrit : Hi, I have a requirement in which I plan to use the SPARK Streaming. I am supposed to calculate the access count to certain webpages.I receive the webpage access information thru log files. By Access count I mean how many times was the page accessed *till now* I have the log files for past 2 years and everyday we keep receiving almost 6 GB of access logs(on an hourly basis). Since we receive these logs on an hourly basis I feel that I should use the SPARK Streaming. But the problem is that the access counts have to be cumulative , i.e even the older access(past 2 years) counts for a webpage should also be considered for the final value. How to achieve this thru streaming, since streaming picks only new files. I don't want to use DB to store the access counts since it would considerably slow down the processing. Thanks, Baahu -- Twitter:http://twitter.com/Baahu
Re: input size too large | Performance issues with Spark
Reading Sandy's blog, there seems to be one typo. bq. Similarly, the heap size can be controlled with the --executor-cores flag or thespark.executor.memory property. '--executor-memory' should be the right flag. BTW bq. It defaults to max(384, .07 * spark.executor.memory) Default memory overhead has been increased to 10 percent in master branch. See SPARK-6085. Though the change is not in 1.3 Cheers On Thu, Apr 2, 2015 at 12:55 PM, Christian Perez christ...@svds.com wrote: To Akhil's point, see Tuning Data structures. Avoid standard collection hashmap. With fewer machines, try running 4 or 5 cores per executor and only 3-4 executors (1 per node): http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ . Ought to reduce shuffle performance hit (someone else confirm?) #7 see default.shuffle.partitions (default: 200) On Sun, Mar 29, 2015 at 7:57 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Go through this once, if you haven't read it already. https://spark.apache.org/docs/latest/tuning.html Thanks Best Regards On Sat, Mar 28, 2015 at 7:33 PM, nsareen nsar...@gmail.com wrote: Hi All, I'm facing performance issues with spark implementation, and was briefly investigating on WebUI logs, i noticed that my RDD size is 55GB the Shuffle Write is 10 GB Input Size is 200GB. Application is a web application which does predictive analytics, so we keep most of our data in memory. This observation was only for 30mins usage of the application on a single user. We anticipate atleast 10-15 users of the application sending requests in parallel, which makes me a bit nervous. One constraint we have is that we do not have too many nodes in a cluster, we may end up with 3-4 machines at best, but they can be scaled up vertically each having 24 cores / 512 GB ram etc. which can allow us to make a virtual 10-15 node cluster. Even then the input size shuffle write is too high for my liking. Any suggestions in this regard will be greatly appreciated as there aren't much resource on the net for handling performance issues such as these. Some pointers on my application's data structures design 1) RDD is a JavaPairRDD with Key / Value as CustomPOJO containing 3-4 Hashmaps Value containing 1 Hashmap 2) Data is loaded via JDBCRDD during application startup, which also tends to take a lot of time, since we massage the data once it is fetched from DB and then save it as JavaPairRDD. 3) Most of the data is structured, but we are still using JavaPairRDD, have not explored the option of Spark SQL though. 4) We have only one SparkContext which caters to all the requests coming into the application from various users. 5) During a single user session user can send 3-4 parallel stages consisting of Map / Group By / Join / Reduce etc. 6) We have to change the RDD structure using different types of group by operations since the user can do drill down drill up of the data ( aggregation at a higher / lower level). This is where we make use of Groupby's but there is a cost associated with this. 7) We have observed, that the initial RDD's we create have 40 odd partitions, but post some stage executions like groupby's the partitions increase to 200 or so, this was odd, and we havn't figured out why this happens. In summary we wan to use Spark to provide us the capability to process our in-memory data structure very fast as well as scale to a larger volume when required in the future. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/input-size-too-large-Performance-issues-with-Spark-tp22270.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pseudo Spark Streaming ?
Hi, I have a requirement in which I plan to use the SPARK Streaming. I am supposed to calculate the access count to certain webpages.I receive the webpage access information thru log files. By Access count I mean how many times was the page accessed *till now* I have the log files for past 2 years and everyday we keep receiving almost 6 GB of access logs(on an hourly basis). Since we receive these logs on an hourly basis I feel that I should use the SPARK Streaming. But the problem is that the access counts have to be cumulative , i.e even the older access(past 2 years) counts for a webpage should also be considered for the final value. How to achieve this thru streaming, since streaming picks only new files. I don't want to use DB to store the access counts since it would considerably slow down the processing. Thanks, Baahu -- Twitter:http://twitter.com/Baahu
Re: 4 seconds to count 13M lines. Does it make sense?
Are you pre-caching them in memory? On Apr 4, 2015 3:29 AM, SamyaMaiti samya.maiti2...@gmail.com wrote: Reduce *spark.sql.shuffle.partitions* from default of 200 to total number of cores. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/4-seconds-to-count-13M-lines-Does-it-make-sense-tp22360p22374.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kinesis
ᐧ Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST,
Diff between foreach and foreachsync
Hi can somebody explain me what is the difference between foreach and foreachsync over RDD action. which one will give good result maximum throughput. does foreach run in parallel way?
Sending RDD object over the network
For a class project, I am trying to utilize 2 spark Applications communicate with each other by passing an RDD object that was created from one application to another Spark application. The first application is developed in Scala and creates an RDD and sends it to the 2nd application over the network as follows: val logFile = ../../spark-1.3.0/README.md // Should be some file on your system val conf = new SparkConf(); conf.setAppName(Simple Application).setMaster(local[2]) val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 100, 2).toJavaRDD(); val s = new Socket(127.0.0.1, 8000); val objectOutput = new ObjectOutputStream(s.getOutputStream()); objectOutput.writeObject(nums); s.close(); The second Spark application is a Java application, which tries to receive the RDD object and then perform some operations on it. At the moment, I am trying to see if I have properly obtained the object. ServerSocket listener = null; Socket client; try{ listener = new ServerSocket(8000); }catch(Exception e){ e.printStackTrace(); } System.out.println(Listening); try{ client = listener.accept(); ObjectInputStream objectInput = new ObjectInputStream(client.getInputStream()); Object object =(JavaRDD) objectInput.readObject(); JavaRDD tmp = (JavaRDD) object; if(tmp != null){ System.out.println(tmp.getStorageLevel().toString()); ListPartition p = tmp.partitions(); } else{ System.out.println(variable is null); } }catch(Exception e){ e.printStackTrace(); } The output I get is: StorageLevel(false, false, false, false, 1) java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at SimpleApp.main(SimpleApp.java:35) So, System.out.println(tmp.getStorageLevel().toString()); prints out properly. But, ListPartition p = tmp.partitions(); throws the NullPointerException. I can't seem to figure out why this is happening. In a nutshell, I am basically trying to create an RDD object in one Spark application and then send the object to another application. After receiving the object I try to make sure I received it properly by accessing its methods. Invoking the partitions() method in the original Spark application does not throw any errors either. I would greatly appreciate any suggestion on how I can solve my problem, or an alternative solution for what I am trying to accomplish. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: newAPIHadoopRDD Mutiple scan result return from Hbase
Looks like MultiRowRangeFilter would serve your need. See HBASE-11144. HBase 1.1 would be released in May. You can also backport it to the HBase release you're using. On Sat, Apr 4, 2015 at 8:45 AM, Jeetendra Gangele gangele...@gmail.com wrote: Here is my conf object passing first parameter of API. but here I want to pass multiple scan means i have 4 criteria for STRAT ROW and STOROW in same table. by using below code i can get result for one STARTROW and ENDROW. Configuration conf = DBConfiguration.getConf(); // int scannerTimeout = (int) conf.getLong( // HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, -1); // System.out.println(lease timeout on server is+scannerTimeout); int scannerTimeout = (int) conf.getLong( hbase.client.scanner.timeout.period, -1); // conf.setLong(hbase.client.scanner.timeout.period, 6L); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); Scan scan = new Scan(); scan.addFamily(FAMILY); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); filterList.addFilter(new KeyOnlyFilter()); filterList.addFilter(new FirstKeyOnlyFilter()); scan.setFilter(filterList); scan.setCacheBlocks(false); scan.setCaching(10); scan.setBatch(1000); scan.setSmall(false); conf.set(TableInputFormat.SCAN, DatabaseUtils.convertScanToString(scan)); return conf; On 4 April 2015 at 20:54, Jeetendra Gangele gangele...@gmail.com wrote: Hi All, Can we get the result of the multiple scan from JavaSparkContext.newAPIHadoopRDD from Hbase. This method first parameter take configuration object where I have added filter. but how Can I query multiple scan from same table calling this API only once? regards jeetendra