SPARKTA: a real-time aggregation engine based on Spark Streaming
Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to set random seed
Thanks for the reply. I have not tried it out (I will today and report on my results) but I think what I need to do is to call mapPartitions and pass it a function that sets the seed. I was planning to pass the seed value in the closure. Something like: my_seed = 42 def f(iterator): random.seed(my_seed) yield my_seed rdd.mapPartitions(f) From: ayan guha guha.a...@gmail.com Sent: Thursday, May 14, 2015 2:29 AM To: Charles Hayden Cc: user Subject: Re: how to set random seed Sorry for late reply. Here is what I was thinking import random as r def main(): get SparkContext #Just for fun, lets assume seed is an id filename=bin.dat seed = id(filename) #broadcast it br = sc.broadcast(seed) #set up dummy list lst = [] for i in range(4): x=[] for j in range(4): x.append(j) lst.append(x) print lst base = sc.parallelize(lst) print base.map(randomize).collect() Randomize looks like def randomize(lst): local_seed = br.value r.seed(local_seed) r.shuffle(lst) return lst Let me know if this helps... base = sc.parallelize(lst) print base.map(randomize).collect() On Wed, May 13, 2015 at 11:41 PM, Charles Hayden charles.hay...@atigeo.commailto:charles.hay...@atigeo.com wrote: ?Can you elaborate? Broadcast will distribute the seed, which is only one number. But what construct do I use to plant the seed (call random.seed()) once on each worker? From: ayan guha guha.a...@gmail.commailto:guha.a...@gmail.com Sent: Tuesday, May 12, 2015 11:17 PM To: Charles Hayden Cc: user Subject: Re: how to set random seed Easiest way is to broadcast it. On 13 May 2015 10:40, Charles Hayden charles.hay...@atigeo.commailto:charles.hay...@atigeo.com wrote: In pySpark, I am writing a map with a lambda that calls random.shuffle. For testing, I want to be able to give it a seed, so that successive runs will produce the same shuffle. I am looking for a way to set this same random seed once on each worker. Is there any simple way to do it?? -- Best Regards, Ayan Guha
Re: Using sc.HadoopConfiguration in Python
Here is an example of how I would pass in the S3 parameters to hadoop configuration in pyspark. You can do something similar for other parameters you want to pass to the hadoop configuration hadoopConf=sc._jsc.hadoopConfiguration() hadoopConf.set(fs.s3.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3n.awsAccessKeyId,$your_access_key_id) hadoopConf.set(fs.s3n.awsSecretAccessKey,$your_secret_access_key) lines = sc.textFile($your_dataset_in_S3) lines.count() On Thu, May 14, 2015 at 4:17 AM, ayan guha guha.a...@gmail.com wrote: Jo Thanks for the reply, but _jsc does not have anything to pass hadoop configs. can you illustrate your answer a bit more? TIA... On Wed, May 13, 2015 at 12:08 AM, Ram Sriharsha sriharsha@gmail.com wrote: yes, the SparkContext in the Python API has a reference to the JavaSparkContext (jsc) https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext through which you can access the hadoop configuration On Tue, May 12, 2015 at 6:39 AM, ayan guha guha.a...@gmail.com wrote: Hi I found this method in scala API but not in python API (1.3.1). Basically, I want to change blocksize in order to read a binary file using sc.binaryRecords but with multiple partitions (for testing I want to generate partitions smaller than default blocksize)/ Is it possible in python? if so, how? -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
Re: reduceByKey
What have you tried so far? Maybe, the easiest way is using a collection and reduce them adding its values. JavaPairRDDString, String pairRDD = sc.parallelizePairs(data); JavaPairRDDString, ListInteger result = pairRDD.mapToPair(new Functions.createList()) .mapToPair(new Functions.ListStringToInt()) .reduceByKey(new SumList()); Functions implementation using Java 7. Java 8 should be more simple. public static final class SumList implements Function2ListInteger, ListInteger, ListInteger { @Override public ListInteger call(ListInteger l1, ListInteger l2) throws Exception { ListInteger result = new ArrayListInteger(); for(int i=0;il1.size();++i){ result.add(l1.get(i)+l2.get(i)); } return result; } } public static final class ListStringToInt implements PairFunctionTuple2String, ListString, String, ListInteger { @Override public Tuple2String, ListInteger call(Tuple2String, ListString tuple2) throws Exception { ListInteger result = new ArrayListInteger(); for(String number : tuple2._2()){ result.add(Integer.valueOf(number)); } return new Tuple2String, ListInteger(tuple2._1(),result); } } public static final class createList implements PairFunctionTuple2String, String, String, ListString { @Override public Tuple2String, ListString call(Tuple2String, String tuple2) throws Exception { return new Tuple2String, ListString(tuple2._1(), Arrays.asList(tuple2._2().split(,))); } } 2015-05-14 15:40 GMT+02:00 Yasemin Kaya godo...@gmail.com: Hi, I have JavaPairRDDString, String and I want to implement reduceByKey method. My pairRDD : *2553: 0,0,0,1,0,0,0,0* 46551: 0,1,0,0,0,0,0,0 266: 0,1,0,0,0,0,0,0 *2553: 0,0,0,0,0,1,0,0* *225546: 0,0,0,0,0,1,0,0* *225546: 0,0,0,0,0,1,0,0* I want to get : *2553: 0,0,0,1,0,1,0,0* 46551: 0,1,0,0,0,0,0,0 266: 0,1,0,0,0,0,0,0 *225546: 0,0,0,0,0,2,0,0* Anyone can help me getting that? Thank you. Have a nice day. yasemin -- hiç ender hiç -- Gaspar Muñoz @gmunozsoria http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*
Re: how to delete data from table in sparksql
Delete from table is available as part of Hive 0.14 (reference: Apache Hive Language Manual DML - Delete https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Delete) while Spark 1.3 defaults to Hive 0.13.Perhaps rebuild Spark with Hive 0.14 or generate a new table filtering out the values you do not want. On Thu, May 14, 2015 at 3:26 AM luohui20...@sina.com wrote: Hi guys i got to delete some data from a table by delete from table where name = xxx, however delete is not functioning like the DML operation in hive. I got a info like below: Usage: delete [FILE|JAR|ARCHIVE] value [value]* 15/05/14 18:18:24 ERROR processors.DeleteResourceProcessor: Usage: delete [FILE|JAR|ARCHIVE] value [value]* I checked the list of Supported Hive Features , but not found if this dml is supported. So any comments will be appreciated. Thanksamp;Best regards! San.Luo
Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed
Yes it is repeatedly on my locally Jenkins. 发自我的 iPhone 在 2015年5月14日,18:30,Tathagata Das t...@databricks.commailto:t...@databricks.com 写道: Do you get this failure repeatedly? On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.commailto:wangf...@huawei.com wrote: Hi, all, i got following error when i run unit test of spark by dev/run-tests on the latest branch-1.4 branch. the latest commit id: commit d518c0369fa412567855980c3f0f426cde5c190d Author: zsxwing zsxw...@gmail.commailto:zsxw...@gmail.com Date: Wed May 13 17:58:29 2015 -0700 error [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed: org.apache.spark.SparkException: Error communicating with MapOutputTracker [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [error] at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119) [error] at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324) [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93) [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala) [error] at org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103) [error] ... [error] Caused by: org.apache.spark.SparkException: Error sending message [message = StopMapOutputTracker] [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109) [error] ... 52 more [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] [error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) [error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) [error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) [error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) [error] at scala.concurrent.Await$.result(package.scala:107) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) [error] ... 54 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: how to read lz4 compressed data using fileStream of spark streaming?
it still does’t work… the streamingcontext could detect the new file, but it shows: ERROR dstream.FileInputDStream: File hdfs://nameservice1/sandbox/hdfs/list_join_action/2015_05_14_20_stream_1431605640.lz4 has no data in it. Spark Streaming can only ingest files that have been moved to the directory assigned to the file stream. Refer to the streaming programming guide for more details. but the file indeed has many lines... 在 2015年5月14日,下午4:00,Akhil Das ak...@sigmoidanalytics.com 写道: Here's https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java the class. You can read more here https://github.com/twitter/hadoop-lzo#maven-repository https://github.com/twitter/hadoop-lzo#maven-repository Thanks Best Regards On Thu, May 14, 2015 at 1:22 PM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: LzoTextInputFormat where is this class? what is the maven dependency? 在 2015年5月14日,下午3:40,Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com 写道: That's because you are using TextInputFormat i think, try with LzoTextInputFormat like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, com.hadoop.mapreduce.LzoTextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) Thanks Best Regards On Thu, May 14, 2015 at 1:04 PM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: I have action on DStream. because when I put a text file into the hdfs, it runs normally, but if I put a lz4 file, it does nothing. 在 2015年5月14日,下午3:32,Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com 写道: What do you mean by not detected? may be you forgot to trigger some action on the stream to get it executed. Like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) list_join_action_stream.count().print() Thanks Best Regards On Wed, May 13, 2015 at 7:18 PM, hotdog lisend...@163.com mailto:lisend...@163.com wrote: in spark streaming, I want to use fileStream to monitor a directory. But the files in that directory are compressed using lz4. So the new lz4 files are not detected by the following code. How to detect these new files? val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
reduceByKey
Hi, I have JavaPairRDDString, String and I want to implement reduceByKey method. My pairRDD : *2553: 0,0,0,1,0,0,0,0* 46551: 0,1,0,0,0,0,0,0 266: 0,1,0,0,0,0,0,0 *2553: 0,0,0,0,0,1,0,0* *225546: 0,0,0,0,0,1,0,0* *225546: 0,0,0,0,0,1,0,0* I want to get : *2553: 0,0,0,1,0,1,0,0* 46551: 0,1,0,0,0,0,0,0 266: 0,1,0,0,0,0,0,0 *225546: 0,0,0,0,0,2,0,0* Anyone can help me getting that? Thank you. Have a nice day. yasemin -- hiç ender hiç
Re: --jars works in yarn-client but not yarn-cluster mode, why?
thanks, Wilfred. In our program, the htrace-core-3.1.0-incubating.jar dependency is only required in the executor, not in the driver. while in both yarn-client and yarn-cluster, the executor runs in cluster. and it's clearly in yarn-cluster mode, the jar IS in spark.yarn.secondary.jars, but still throws ClassNotFoundException 2015-05-14 18:52 GMT+08:00 Wilfred Spiegelenburg wspiegelenb...@cloudera.com: In the cluster the driver runs in the cluster and not locally in the spark-submit JVM. This changes what is available on your classpath. It looks like you are running into a similar situation as described in SPARK-5377. Wilfred On 14/05/2015 13:47, Fengyun RAO wrote: I look into the Environment in both modes. yarn-client: spark.jars local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar,file:/home/xxx/my-app.jar yarn-cluster: spark.yarn.secondary.jars local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar I wonder why htrace exists in spark.yarn.secondary.jars but still not found in URLClassLoader. I tried both local and file mode for the jar, still the same error. 2015-05-14 11:37 GMT+08:00 Fengyun RAO raofeng...@gmail.com mailto:raofeng...@gmail.com: Hadoop version: CDH 5.4. We need to connect to HBase, thus need extra /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar dependency. It works in yarn-client mode: spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors 10 --executor-memory 10g --jars /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar my-app.jar /input /output However, if we change yarn-client to yarn-cluster', it throws an ClassNotFoundException (actually the class exists in htrace-core-3.1.0-incubating.jar): Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:218) at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481) at org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:850) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.init(ConnectionManager.java:635) ... 21 more Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) Why --jars doesn't work in yarn-cluster mode? How to add extra dependency in yarn-cluster mode? -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org mailto:cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout . -- Wilfred Spiegelenburg Backline Customer Operations Engineer YARN/MapReduce/Spark http://www.cloudera.com -- http://five.sentenc.es -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout.
Re: spark sql hive-shims
I see that the pre-built distributions includes hive-shims-0.23 shaded in spark-assembly jar (unlike when I make the distribution myself). Does anyone knows what I should do to include the shims in my distribution? On Thu, May 14, 2015 at 9:52 AM, Lior Chaga lio...@taboola.com wrote: Ultimately it was PermGen out of memory. I somehow missed it in the log On Thu, May 14, 2015 at 9:24 AM, Lior Chaga lio...@taboola.com wrote: After profiling with YourKit, I see there's an OutOfMemoryException in context SQLContext.applySchema. Again, it's a very small RDD. Each executor has 180GB RAM. On Thu, May 14, 2015 at 8:53 AM, Lior Chaga lio...@taboola.com wrote: Hi, Using spark sql with HiveContext. Spark version is 1.3.1 When running local spark everything works fine. When running on spark cluster I get ClassNotFoundError org.apache.hadoop.hive.shims.Hadoop23Shims. This class belongs to hive-shims-0.23, and is a runtime dependency for spark-hive: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.1 [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime My spark distribution is: make-distribution.sh --tgz -Phive -Phive-thriftserver -DskipTests If I try to add this dependency to my driver project, then the exception disappears, but then the task is stuck when registering an rdd as a table (I get timeout after 30 seconds). I should emphasize that the first rdd I register as a table is a very small one (about 60K row), and as I said - it runs swiftly in local. I suspect maybe other dependencies are missing, but they fail silently. Would be grateful if anyone knows how to solve it. Lior
Re: SPARKTA: a real-time aggregation engine based on Spark Streaming
...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: swap tuple
Yea, I wouldn't try and modify the current since RDDs are suppose to be immutable, just create a new one... val newRdd = oldRdd.map(r = (r._2(), r._1())) or something of that nature... Steve From: Evo Eftimov [evo.efti...@isecc.com] Sent: Thursday, May 14, 2015 1:24 PM To: 'Holden Karau'; 'Yasemin Kaya' Cc: user@spark.apache.org Subject: RE: swap tuple Where is the “Tuple” supposed to be in String, String - you can refer to a “Tuple” if it was e.g. String, Tuple2String, String From: holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] On Behalf Of Holden Karau Sent: Thursday, May 14, 2015 5:56 PM To: Yasemin Kaya Cc: user@spark.apache.org Subject: Re: swap tuple Can you paste your code? transformations return a new RDD rather than modifying an existing one, so if you were to swap the values of the tuple using a map you would get back a new RDD and then you would want to try and print this new RDD instead of the original one. On Thursday, May 14, 2015, Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com wrote: Hi, I have JavaPairRDDString, String and I want to swap tuple._1() to tuple._2(). I use tuple.swap() but it can't be changed JavaPairRDD in real. When I print JavaPairRDD, the values are same. Anyone can help me for that? Thank you. Have nice day. yasemin -- hiç ender hiç -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.
Re: how to delete data from table in sparksql
The list of unsupported hive features should mention that it implicitly includes features added after Hive 13. You cannot yet compile with Hive 13, though we are investigating this for 1.5 On Thu, May 14, 2015 at 6:40 AM, Denny Lee denny.g@gmail.com wrote: Delete from table is available as part of Hive 0.14 (reference: Apache Hive Language Manual DML - Delete https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Delete) while Spark 1.3 defaults to Hive 0.13.Perhaps rebuild Spark with Hive 0.14 or generate a new table filtering out the values you do not want. On Thu, May 14, 2015 at 3:26 AM luohui20...@sina.com wrote: Hi guys i got to delete some data from a table by delete from table where name = xxx, however delete is not functioning like the DML operation in hive. I got a info like below: Usage: delete [FILE|JAR|ARCHIVE] value [value]* 15/05/14 18:18:24 ERROR processors.DeleteResourceProcessor: Usage: delete [FILE|JAR|ARCHIVE] value [value]* I checked the list of Supported Hive Features , but not found if this dml is supported. So any comments will be appreciated. Thanksamp;Best regards! San.Luo
RE: swap tuple
Where is the “Tuple” supposed to be in String, String - you can refer to a “Tuple” if it was e.g. String, Tuple2String, String From: holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] On Behalf Of Holden Karau Sent: Thursday, May 14, 2015 5:56 PM To: Yasemin Kaya Cc: user@spark.apache.org Subject: Re: swap tuple Can you paste your code? transformations return a new RDD rather than modifying an existing one, so if you were to swap the values of the tuple using a map you would get back a new RDD and then you would want to try and print this new RDD instead of the original one. On Thursday, May 14, 2015, Yasemin Kaya godo...@gmail.com wrote: Hi, I have JavaPairRDDString, String and I want to swap tuple._1() to tuple._2(). I use tuple.swap() but it can't be changed JavaPairRDD in real. When I print JavaPairRDD, the values are same. Anyone can help me for that? Thank you. Have nice day. yasemin -- hiç ender hiç -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: store hive metastore on persistent store
You can configure Spark SQLs hive interaction by placing a hive-site.xml file in the conf/ directory. On Thu, May 14, 2015 at 10:24 AM, jamborta jambo...@gmail.com wrote: Hi all, is it possible to set hive.metastore.warehouse.dir, that is internally create by spark, to be stored externally (e.g. s3 on aws or wasb on azure)? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-hive-metastore-on-persistent-store-tp22891.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: swap tuple
I solved my problem right this way. JavaPairRDDString, String swappedPair = pair.mapToPair( new PairFunctionTuple2String, String, String, String() { @Override public Tuple2String, String call( Tuple2String, String item) throws Exception { return item.swap(); } }); 2015-05-14 20:42 GMT+03:00 Stephen Carman scar...@coldlight.com: Yea, I wouldn't try and modify the current since RDDs are suppose to be immutable, just create a new one... val newRdd = oldRdd.map(r = (r._2(), r._1())) or something of that nature... Steve -- *From:* Evo Eftimov [evo.efti...@isecc.com] *Sent:* Thursday, May 14, 2015 1:24 PM *To:* 'Holden Karau'; 'Yasemin Kaya' *Cc:* user@spark.apache.org *Subject:* RE: swap tuple Where is the “Tuple” supposed to be in String, String - you can refer to a “Tuple” if it was e.g. String, Tuple2String, String *From:* holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] *On Behalf Of *Holden Karau *Sent:* Thursday, May 14, 2015 5:56 PM *To:* Yasemin Kaya *Cc:* user@spark.apache.org *Subject:* Re: swap tuple Can you paste your code? transformations return a new RDD rather than modifying an existing one, so if you were to swap the values of the tuple using a map you would get back a new RDD and then you would want to try and print this new RDD instead of the original one. On Thursday, May 14, 2015, Yasemin Kaya godo...@gmail.com wrote: Hi, I have *JavaPairRDDString, String *and I want to *swap tuple._1() to tuple._2()*. I use *tuple.swap() *but it can't be changed JavaPairRDD in real. When I print JavaPairRDD, the values are same. Anyone can help me for that? Thank you. Have nice day. yasemin -- hiç ender hiç -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment. -- hiç ender hiç
Re: swap tuple
Can you paste your code? transformations return a new RDD rather than modifying an existing one, so if you were to swap the values of the tuple using a map you would get back a new RDD and then you would want to try and print this new RDD instead of the original one. On Thursday, May 14, 2015, Yasemin Kaya godo...@gmail.com wrote: Hi, I have *JavaPairRDDString, String *and I want to *swap tuple._1() to tuple._2()*. I use *tuple.swap() *but it can't be changed JavaPairRDD in real. When I print JavaPairRDD, the values are same. Anyone can help me for that? Thank you. Have nice day. yasemin -- hiç ender hiç -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
store hive metastore on persistent store
Hi all, is it possible to set hive.metastore.warehouse.dir, that is internally create by spark, to be stored externally (e.g. s3 on aws or wasb on azure)? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-hive-metastore-on-persistent-store-tp22891.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception
End of the month is the target: https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage On Thu, May 14, 2015 at 3:45 AM, Ishwardeep Singh ishwardeep.si...@impetus.co.in wrote: Hi Michael Ayan, Thank you for your response to my problem. Michael do we have a tentative release date for Spark version 1.4? Regards, Ishwardeep *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Wednesday, May 13, 2015 10:54 PM *To:* ayan guha *Cc:* Ishwardeep Singh; user *Subject:* Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception I think this is a bug in our date handling that should be fixed in Spark 1.4. On Wed, May 13, 2015 at 8:23 AM, ayan guha guha.a...@gmail.com wrote: Your stack trace says it can't convert date to integer. You sure about column positions? On 13 May 2015 21:32, Ishwardeep Singh ishwardeep.si...@impetus.co.in wrote: Hi , I am using Spark SQL 1.3.1. I have created a dataFrame using jdbc data source and am using saveAsTable() method but got the following 2 exceptions: java.lang.RuntimeException: Unsupported datatype DecimalType() at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015) java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
Re: reduceByKey
Here is a python code, I am sure you'd get the drift. Basically you need to implement 2 functions: seq and comb in order to partial and final operations. def addtup(t1,t2): j=() for k,v in enumerate(t1): j=j+(t1[k]+t2[k],) return j def seq(tIntrm,tNext): return addtup(tIntrm,tNext) def comb(tP,tF): return addtup(tP,tF) lst = [(2553,(0,0,0,1,0,0,0,0)), (46551,(0,1,0,0,0,0,0,0)), (266,(0,1,0,0,0,0,0,0)), (2553,(0,0,0,0,0,1,0,0)), (225546,(0,0,0,0,0,1,0,0)), (225546,(0,0,0,0,0,1,0,0))] base = sc.parallelize(lst) res = base.aggregateByKey((0,0,0,0,0,0,0,0),seq,comb) for i in res.collect(): print i Result: (266, (0, 1, 0, 0, 0, 0, 0, 0)) (225546, (0, 0, 0, 0, 0, 2, 0, 0)) (2553, (0, 0, 0, 1, 0, 1, 0, 0)) (46551, (0, 1, 0, 0, 0, 0, 0, 0)) On Thu, May 14, 2015 at 11:40 PM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have JavaPairRDDString, String and I want to implement reduceByKey method. My pairRDD : *2553: 0,0,0,1,0,0,0,0* 46551: 0,1,0,0,0,0,0,0 266: 0,1,0,0,0,0,0,0 *2553: 0,0,0,0,0,1,0,0* *225546: 0,0,0,0,0,1,0,0* *225546: 0,0,0,0,0,1,0,0* I want to get : *2553: 0,0,0,1,0,1,0,0* 46551: 0,1,0,0,0,0,0,0 266: 0,1,0,0,0,0,0,0 *225546: 0,0,0,0,0,2,0,0* Anyone can help me getting that? Thank you. Have a nice day. yasemin -- hiç ender hiç -- Best Regards, Ayan Guha
Re: Using sc.HadoopConfiguration in Python
Super, it worked. Thanks On Fri, May 15, 2015 at 12:26 AM, Ram Sriharsha sriharsha@gmail.com wrote: Here is an example of how I would pass in the S3 parameters to hadoop configuration in pyspark. You can do something similar for other parameters you want to pass to the hadoop configuration hadoopConf=sc._jsc.hadoopConfiguration() hadoopConf.set(fs.s3.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3n.awsAccessKeyId,$your_access_key_id) hadoopConf.set(fs.s3n.awsSecretAccessKey,$your_secret_access_key) lines = sc.textFile($your_dataset_in_S3) lines.count() On Thu, May 14, 2015 at 4:17 AM, ayan guha guha.a...@gmail.com wrote: Jo Thanks for the reply, but _jsc does not have anything to pass hadoop configs. can you illustrate your answer a bit more? TIA... On Wed, May 13, 2015 at 12:08 AM, Ram Sriharsha sriharsha@gmail.com wrote: yes, the SparkContext in the Python API has a reference to the JavaSparkContext (jsc) https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext through which you can access the hadoop configuration On Tue, May 12, 2015 at 6:39 AM, ayan guha guha.a...@gmail.com wrote: Hi I found this method in scala API but not in python API (1.3.1). Basically, I want to change blocksize in order to read a binary file using sc.binaryRecords but with multiple partitions (for testing I want to generate partitions smaller than default blocksize)/ Is it possible in python? if so, how? -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
Re: SPARKTA: a real-time aggregation engine based on Spark Streaming
Nice Job! we are developing something very similar... I will contact you to understand if we can contribute to you with some piece ! Best Paolo Da: Evo Eftimovmailto:evo.efti...@isecc.com Data invio: ?gioved?? ?14? ?maggio? ?2015 ?17?:?21 A: 'David Morales'mailto:dmora...@stratio.com, Matei Zahariamailto:matei.zaha...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org That has been a really rapid evaluation of the work and its direction From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:12 PM To: Matei Zaharia Cc: user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.commailto:dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org -- David Morales de Fr?as :: +34 607 010 411 :: @dmoralesdfhttps://twitter.com/dmoralesdf [http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]http://www.stratio.com/ V?a de las dos Castillas, 33, ?tica 4, 3? Planta 28224 Pozuelo de Alarc?n, Madrid Tel: +34 91 828 6473 // www.stratio.comhttp://www.stratio.com // @stratiobdhttps://twitter.com/StratioBD
RE: SPARKTA: a real-time aggregation engine based on Spark Streaming
I do not intend to provide comments on the actual “product” since my time is engaged elsewhere My comments were on the “process” for commenting which looked as self-indulgent, self patting on the back communication (between members of the party and its party leader) – that bs used to be inherent to the “commercial” vendors, but I can confirm as fact it is also in effect to the “open source movement” (because human nature remains the same) From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:30 PM To: Paolo Platter Cc: Evo Eftimov; Matei Zaharia; user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thank you Paolo. Don't hesitate to contact us. Evo, we will be glad to hear from you and we are happy to see some kind of fast feedback from the main thought leader of spark, for sure. 2015-05-14 17:24 GMT+02:00 Paolo Platter paolo.plat...@agilelab.it: Nice Job! we are developing something very similar… I will contact you to understand if we can contribute to you with some piece ! Best Paolo Da: Evo Eftimov mailto:evo.efti...@isecc.com Data invio: giovedì 14 maggio 2015 17:21 A: 'David Morales' mailto:dmora...@stratio.com , Matei Zaharia mailto:matei.zaha...@gmail.com Cc: user@spark.apache.org That has been a really rapid “evaluation” of the “work” and its “direction” From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:12 PM To: Matei Zaharia Cc: user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: https://twitter.com/dmoralesdf @dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // https://twitter.com/StratioBD @stratiobd -- David Morales de Frías :: +34 607 010 411 :: https://twitter.com/dmoralesdf @dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // https://twitter.com/StratioBD @stratiobd
Re: SPARKTA: a real-time aggregation engine based on Spark Streaming
We put a lot of work in sparkta and it is awesome to hear from both the community and relevant people. Just as easy as that. I hope you have time to consider the project, which is our main concern at this moment, and hear from you too. 2015-05-14 17:46 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: I do not intend to provide comments on the actual “product” since my time is engaged elsewhere My comments were on the “process” for commenting which looked as self-indulgent, self patting on the back communication (between members of the party and its party leader) – that bs used to be inherent to the “commercial” vendors, but I can confirm as fact it is also in effect to the “open source movement” (because human nature remains the same) *From:* David Morales [mailto:dmora...@stratio.com] *Sent:* Thursday, May 14, 2015 4:30 PM *To:* Paolo Platter *Cc:* Evo Eftimov; Matei Zaharia; user@spark.apache.org *Subject:* Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thank you Paolo. Don't hesitate to contact us. Evo, we will be glad to hear from you and we are happy to see some kind of fast feedback from the main thought leader of spark, for sure. 2015-05-14 17:24 GMT+02:00 Paolo Platter paolo.plat...@agilelab.it: Nice Job! we are developing something very similar… I will contact you to understand if we can contribute to you with some piece ! Best Paolo *Da:* Evo Eftimov evo.efti...@isecc.com *Data invio:* giovedì 14 maggio 2015 17:21 *A:* 'David Morales' dmora...@stratio.com, Matei Zaharia matei.zaha...@gmail.com *Cc:* user@spark.apache.org That has been a really rapid “evaluation” of the “work” and its “direction” *From:* David Morales [mailto:dmora...@stratio.com] *Sent:* Thursday, May 14, 2015 4:12 PM *To:* Matei Zaharia *Cc:* user@spark.apache.org *Subject:* Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf https://twitter.com/dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD* -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf https://twitter.com/dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD* -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf https://twitter.com/dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD*
Re: SPARKTA: a real-time aggregation engine based on Spark Streaming
(Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARKTA: a real-time aggregation engine based on Spark Streaming
Thank you Paolo. Don't hesitate to contact us. Evo, we will be glad to hear from you and we are happy to see some kind of fast feedback from the main thought leader of spark, for sure. 2015-05-14 17:24 GMT+02:00 Paolo Platter paolo.plat...@agilelab.it: Nice Job! we are developing something very similar… I will contact you to understand if we can contribute to you with some piece ! Best Paolo *Da:* Evo Eftimov evo.efti...@isecc.com *Data invio:* giovedì 14 maggio 2015 17:21 *A:* 'David Morales' dmora...@stratio.com, Matei Zaharia matei.zaha...@gmail.com *Cc:* user@spark.apache.org That has been a really rapid “evaluation” of the “work” and its “direction” *From:* David Morales [mailto:dmora...@stratio.com] *Sent:* Thursday, May 14, 2015 4:12 PM *To:* Matei Zaharia *Cc:* user@spark.apache.org *Subject:* Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf https://twitter.com/dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD* -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf https://twitter.com/dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD*
Re: SPARKTA: a real-time aggregation engine based on Spark Streaming
Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf https://twitter.com/dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD*
Multiple Kinesis Streams in a single Streaming job
Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Storing a lot of state with updateStateByKey
Hi all, I'm a complete newbie to spark and spark streaming, so the question may seem obvious, sorry for that. It is okay to store Seq[Data] in state when using 'updateStateByKey'? I have a function with signature def saveState(values: Seq[Msg], value: Option[Iterable[Msg]]): Option[Iterable[Msg]] that stores about 200k records in iterable. I've seen most examples having some kind of accumulators in state, so I'm wondering if having a collection is a normal usecase. Maybe you can suggest how to solve my task without this mutable state. I have a kafka topic that generates about 20k messages/sec. I need to group messages based on some key and send groups to another topic. Groups should be sent when number of messages exceeds some count N OR when predefined time T has passed since the first message in a group has arrived, no matter if group messages count is less then N. First of all window functions come into mind, but the problem is that I need to send group as soon N messages arrived, not wait until window duration has passed. I decided to set batch size to 0.5 sec and T is about 3 sec. on each batch I first take groups that have enough messages and send them. The rest of the messages I put to shared state. In updateStateByKey I have all messages that have not been set yet - I again try to group them and send those groups that have enough messages. This way I check messages with latency 0.5 sec instead of 3s. Update function: def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg], Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = { // when does values size is greater than 1? I didn't get into that yet. if (values.size 1){ throw new NullPointerException } // notSent - those that have not been sent yet val (notSent, _) = value.getOrElse((List(), List())) // discard sent // here goes more complex logic with verification if message should be sent based on its arrival time // for now it is simplified val all = notSent ++ values(0) val result = all.groupBy(_.key) .partition(ifNotSend _) Some((result._1.values.flatten, result._2.values.flatten)) } The whole code: val batchSize = 5 // will persist speed-up anything here? val grouped = inputStream.map(msg = (msg.key, msg)).groupByKey().persist() def ifSend(x: (Int, Iterable[_])) = x._2.size = batchSize def ifNotSend(x: (Int, Iterable[_])) = !ifSend(x) val readyToSend = grouped.filter(ifSend _) readyToSend.foreachRDD(rdd = { // send to kafka }) // this should not be sent immediately but combined with those val incomplete = grouped.filter(ifNotSend _) /** * returns (Seq[Msg], Seq[Msg]) * _1 - messages that should not be sent and preserved for next batch execution * _2 - messages that * */ def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg], Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = { if (values.size 1){ throw new NullPointerException } // notSent - those that have not been sent yet val (notSent, _) = value.getOrElse((List(), List())) // discard sent // here goes more complex logic with verification if message should be sent based on its arrival time // for now it is simplified val all = notSent ++ values(0) val result = all.groupBy(_.key) .partition(ifNotSend _) Some((result._1.values.flatten, result._2.values.flatten)) } val state = incomplete.updateStateByKey(saveState _) state.foreachRDD(rdd = { val messagesToSend = rdd.filter(x = x._2._2.nonEmpty) .map(x = x._2._2) println(messagesToSend.collect().flatten.mkString(,)) println() }) Maybe you could suggest a better/more efficient solution? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Storing-a-lot-of-state-with-updateStateByKey-tp22890.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: SPARKTA: a real-time aggregation engine based on Spark Streaming
That has been a really rapid “evaluation” of the “work” and its “direction” From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:12 PM To: Matei Zaharia Cc: user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: https://twitter.com/dmoralesdf @dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // https://twitter.com/StratioBD @stratiobd
Per-machine configuration?
Is it possible to configure each machine that Spark is using as a worker individually? For instance, setting the maximum number of cores to use for each machine individually, or the maximum memory, or other settings related to workers? Or is there any other way to specify a per-machine capacity of some kind? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hive partition table + read using hiveContext + spark 1.3.1
Hi Team, I have a hive partition table with partition column having spaces. When I try to run any query, say a simple Select * from table_name, it fails. *Please note the same was working in spark 1.2.0, now I have upgraded to 1.3.1. Also there is no change in my application code base.* If I give a partition column without spaces, all works fine. Please provide your inputs. Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-partition-table-read-using-hiveContext-spark-1-3-1-tp22894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Restricting the number of iterations in Mllib Kmeans
Hi,, I want to run a definite number of iterations in Kmeans. There is a command line argument to set maxIterations, but even if I set it to a number, Kmeans runs until the centroids converge. Is there a specific way to specify it in command line? Also, I wanted to know if we can supply the initial set of centroids to the program instead of it choosing the centroids in random? Thanks, Suman.
Custom Aggregate Function for DataFrame
Hello, May I know if these is way to implement aggregate function for grouped data in DataFrame? I dug into the doc but didn't find any apart from the UDF functions which applies on a Row. Maybe I have missed something. Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: DStream Union vs. StreamingContext Union
@TD How do I file a JIRA? ᐧ On Tue, May 12, 2015 at 2:06 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I wonder that may be a bug in the Python API. Please file it as a JIRA along with sample code to reproduce it and sample output you get. On Tue, May 12, 2015 at 10:00 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: @TD I kept getting an empty RDD (i.e. rdd.take(1) was False). ᐧ On Tue, May 12, 2015 at 12:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: @Vadim What happened when you tried unioning using DStream.union in python? TD On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com wrote: I can confirm it does work in Java *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:53 PM *To:* Evo Eftimov *Cc:* Saisai Shao; user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. ᐧ On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:37 PM *To:* Saisai Shao *Cc:* user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com : Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
textFileStream Question
How does textFileStream work behind the scenes? How does Spark Streaming know what files are new and need to be processed? Is it based on time stamp, file name? Thanks, Vadim ᐧ
spark log field clarification
I am trying to extract the *output data size* information for *each task*. What *field(s)* should I look for, given the json-format log? Also, what does Result Size stand for? Thanks a lot in advance! -Yanwei -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-log-field-clarification-tp22892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: store hive metastore on persistent store
I have tried to put the hive-site.xml file in the conf/ directory with, seems it is not picking up from there. On Thu, May 14, 2015 at 6:50 PM, Michael Armbrust mich...@databricks.com wrote: You can configure Spark SQLs hive interaction by placing a hive-site.xml file in the conf/ directory. On Thu, May 14, 2015 at 10:24 AM, jamborta jambo...@gmail.com wrote: Hi all, is it possible to set hive.metastore.warehouse.dir, that is internally create by spark, to be stored externally (e.g. s3 on aws or wasb on azure)? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-hive-metastore-on-persistent-store-tp22891.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Data Load - Newbie
Hi, I have a text dataset which I want to apply a cluster algorithm from MLIB. This data is (n,m) matrix with readers. I would like to know if the team can help me on how load this data in Spark Scala and separate the variable I want to cluster. Thanks Rick. [Descrição: Descrição: Descrição: cid:image002.jpg@01CC89A8.2B628650] Ricardo Goncalves da Silva Lead Data Scientist | Seção de Desenvolvimento de Sistemas de Business Intelligence - Projetos de Inovação | IDPB02 Av. Eng. Luis Carlos Berrini, 1.376 - 7º - 04571-000 - SP ricardog.si...@telefonica.commailto:ricardog.si...@telefonica.com | www.telefonica.com.brhttp://www.telefonica.com.br/ Tel +55 11 3430 4955 | Cel +55 11 94292 9526 Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
LogisticRegressionWithLBFGS with large feature set
Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: *Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)* The associated stage in the job is treeAggregate at StandardScaler.scala:52 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala
Re: Multiple Kinesis Streams in a single Streaming job
have you tried to union the 2 streams per the KinesisWordCountASL example https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120 where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
A possible problem may be that the kinesis stream in 1.3 uses the SparkContext app name, as the Kinesis Application Name, that is used by the Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis DStreams are using the Kinesis application name (as they are in the same StreamingContext / SparkContext / Spark app name), KCL may be doing weird overwriting checkpoint information of both Kinesis streams into the same DynamoDB table. Either ways, this is going to be fixed in Spark 1.4. On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote: have you tried to union the 2 streams per the KinesisWordCountASL example https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120 where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Summit 2015 - June 15-17 - Dev list invite
*Join the Apache Spark community at the fourth Spark Summit in San Francisco on June 15, 2015. At Spark Summit 2015 you will hear keynotes from NASA, the CIA, Toyota, Databricks, AWS, Intel, MapR, IBM, Cloudera, Hortonworks, Timeful, O'Reilly, and Andreessen Horowitz. 260 talks proposal were submitted by the community, and 55 were accepted. This year you’ll hear about Spark in use at companies including Uber, Airbnb, Netflix, Taobao, Red Hat, Edmunds, Oracle and more. See the full agenda at http://spark-summit.org/2015 http://spark-summit.org/2015. * *If you are new to Spark or looking to improve on your knowledge of the technology, we have three levels of Spark Training: Intro to Spark, Advanced DevOps with Spark, and Data Science with Spark. Space is limited and we will sell out so register now. Use promo code DevList15 to save 15% when registering before June 1, 2015. Register at http://spark-summit.org/2015/register http://spark-summit.org/2015/register.I look forward to seeing you there.Best, Scott The Spark Summit Organizers*
[SparkStreaming] Is it possible to delay the start of some DStream in the application?
In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [SparkSQL 1.4.0] groupBy columns are always nullable?
Thank you, should I open a JIRA for this issue? From: Olivier Girardot [mailto:ssab...@gmail.com] Sent: Tuesday, May 12, 2015 5:12 AM To: Reynold Xin Cc: Haopu Wang; user Subject: Re: [SparkSQL 1.4.0] groupBy columns are always nullable? I'll look into it - not sure yet what I can get out of exprs :p Le lun. 11 mai 2015 à 22:35, Reynold Xin r...@databricks.com a écrit : Thanks for catching this. I didn't read carefully enough. It'd make sense to have the udaf result be non-nullable, if the exprs are indeed non-nullable. On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com wrote: Hi Haopu, actually here `key` is nullable because this is your input's schema : scala result.printSchema root |-- key: string (nullable = true) |-- SUM(value): long (nullable = true) scala df.printSchema root |-- key: string (nullable = true) |-- value: long (nullable = false) I tried it with a schema where the key is not flagged as nullable, and the schema is actually respected. What you can argue however is that SUM(value) should also be not nullable since value is not nullable. @rxin do you think it would be reasonable to flag the Sum aggregation function as nullable (or not) depending on the input expression's schema ? Regards, Olivier. Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit : Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Hi TD, regarding to the performance of updateStateByKey, do you have a JIRA for that so we can watch it? Thank you! From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 8:09 AM To: Krzysztof Zarzycki Cc: user Subject: Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months? Fundamentally, stream processing systems are designed for processing streams of data, not for storing large volumes of data for a long period of time. So if you have to maintain that much state for months, then its best to use another system that is designed for long term storage (like Cassandra) which has proper support for making all that state fault-tolerant, high-performant, etc. So yes, the best option is to use Cassandra for the state and Spark Streaming jobs accessing the state from Cassandra. There are a number of optimizations that can be done. Its not too hard to build a simple on-demand populated cache (singleton hash map for example), that speeds up access from Cassandra, and all updates are written through the cache. This is a common use of Spark Streaming + Cassandra/HBase. Regarding the performance of updateStateByKey, we are aware of the limitations, and we will improve it soon :) TD On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-mill ions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even 2 months. During that time there might be around 500 milionof such events, and millions of unique users, which are keys here. My questions are: * Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? * Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: * using Cassandra for state * using Trident state (with Cassandra probably) * using Samza with its state management.
Spark's Guava pieces cause exceptions in non-trivial deployments
Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use mix of Guava classes (including Spark's pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it's not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any external Guava can provide them. So, could such problems be fixed if those Spark's pieces of Guava would be in separate jar and could be excluded from the mix (substituted by external Guava)? Thanks, Anton
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo
Re: Spark performance in cluster mode using yarn
With this information it is hard to predict. What's the performance you are getting? What's your desired performance? Maybe you can post your code and experts can suggests improvement? On 14 May 2015 15:02, sachin Singh sachin.sha...@gmail.com wrote: Hi Friends, please someone can give the idea, Ideally what should be time(complete job execution) for spark job, I have data in a hive table, amount of data would be 1GB , 2 lacs rows for whole month, I want to do monthly aggregation, using SQL queries,groupby I have only one node,1 cluster,below configuration for running job, --num-executors 2 --driver-memory 3g --driver-java-options -XX:MaxPermSize=1G --executor-memory 2g --executor-cores 2 how much approximate time require to finish the job, or can someone suggest the best way to get quickly results, Thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-in-cluster-mode-using-yarn-tp22877.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
Hi Tathagata, I think that's exactly what's happening. The error message is: com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49550673839151225431779125105915140284622031848663416866 used in GetShardIterator on shard shardId-0002 in stream erich-test under account xxx is invalid because it did not come from this stream. I looked at the DynamoDB table and each job has single table and that table does not contain any stream identification information, only shard checkpointing data. I think the error is that when it tries to read from stream B, it's using checkpointing data for stream A and errors out. So it appears, at first glance, that currently you can't read from multiple Kinesis streams in a single job. I haven't tried this, but it might be possible for this to work if I force each stream to have different shard IDs so there is no ambiguity in the DynamoDB table; however, that's clearly not a feasible production solution. Thanks, -Erich On Thu, May 14, 2015 at 8:34 PM, Tathagata Das t...@databricks.com wrote: A possible problem may be that the kinesis stream in 1.3 uses the SparkContext app name, as the Kinesis Application Name, that is used by the Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis DStreams are using the Kinesis application name (as they are in the same StreamingContext / SparkContext / Spark app name), KCL may be doing weird overwriting checkpoint information of both Kinesis streams into the same DynamoDB table. Either ways, this is going to be fixed in Spark 1.4. On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote: have you tried to union the 2 streams per the KinesisWordCountASL example https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120 where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Erich Ess | CTO c. 310-703-6058 @SimpleRelevance | 130 E Randolph, Ste 1650 | Chicago, IL 60601 *Machine Learning For Marketers* Named a top startup to watch in Crain's — View the Article. http://www.chicagobusiness.com/article/20130928/ISSUE02/130929801/big-data-draws-big-interest-and-simple-relevance-is-leading-the SimpleRelevance.com http://simplerelevance.com/ | Facebook https://www.facebook.com/simplerelevance | Twitter http://www.twitter.com/simplerelevance | Blog http://blog.simplerelevance.com/
RE: Spark's Guava pieces cause exceptions in non-trivial deployments
The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Thursday, May 14, 2015 6:27 PM To: Anton Brazhnyk Cc: user@spark.apache.org Subject: Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo
Re: Using sc.HadoopConfiguration in Python
Jo Thanks for the reply, but _jsc does not have anything to pass hadoop configs. can you illustrate your answer a bit more? TIA... On Wed, May 13, 2015 at 12:08 AM, Ram Sriharsha sriharsha@gmail.com wrote: yes, the SparkContext in the Python API has a reference to the JavaSparkContext (jsc) https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext through which you can access the hadoop configuration On Tue, May 12, 2015 at 6:39 AM, ayan guha guha.a...@gmail.com wrote: Hi I found this method in scala API but not in python API (1.3.1). Basically, I want to change blocksize in order to read a binary file using sc.binaryRecords but with multiple partitions (for testing I want to generate partitions smaller than default blocksize)/ Is it possible in python? if so, how? -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
Spark 1.3.0 - 1.3.1 produces java.lang.NoSuchFieldError: NO_FILTER
Hello Bright Sparks, I was using Spark 1.3.0 to push data out to Parquet files. They have been working great, super fast, easy way to persist data frames etc. However I just swapped out Spark 1.3.0 and picked up the tarball for 1.3.1. I unzipped it, copied my config over and then went to read one of my parquet files from the last release when I got this: java.lang.NoSuchFieldError: NO_FILTER at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:299) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) I did some googling, it appears there were some changes to the Parquet file format. I found a reference to an option: sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false) Which I tried, but I got the same error (slightly different cause though). java.lang.NoSuchFieldError: NO_FILTER at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:494) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:494) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:494) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:515) at org.apache.spark.sql.parquet.ParquetRelation.init(ParquetRelation.scala:67) at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:542) I presume its not just me, anyone else come across this ? Any suggestions how to work around it ? can I set an option like old.parquet.format or something ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-1-3-1-produces-java-lang-NoSuchFieldError-NO-FILTER-tp22897.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql hive-shims
After profiling with YourKit, I see there's an OutOfMemoryException in context SQLContext.applySchema. Again, it's a very small RDD. Each executor has 180GB RAM. On Thu, May 14, 2015 at 8:53 AM, Lior Chaga lio...@taboola.com wrote: Hi, Using spark sql with HiveContext. Spark version is 1.3.1 When running local spark everything works fine. When running on spark cluster I get ClassNotFoundError org.apache.hadoop.hive.shims.Hadoop23Shims. This class belongs to hive-shims-0.23, and is a runtime dependency for spark-hive: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.1 [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime My spark distribution is: make-distribution.sh --tgz -Phive -Phive-thriftserver -DskipTests If I try to add this dependency to my driver project, then the exception disappears, but then the task is stuck when registering an rdd as a table (I get timeout after 30 seconds). I should emphasize that the first rdd I register as a table is a very small one (about 60K row), and as I said - it runs swiftly in local. I suspect maybe other dependencies are missing, but they fail silently. Would be grateful if anyone knows how to solve it. Lior
Re: how to read lz4 compressed data using fileStream of spark streaming?
What do you mean by not detected? may be you forgot to trigger some action on the stream to get it executed. Like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) *list_join_action_stream.count().print()* Thanks Best Regards On Wed, May 13, 2015 at 7:18 PM, hotdog lisend...@163.com wrote: in spark streaming, I want to use fileStream to monitor a directory. But the files in that directory are compressed using lz4. So the new lz4 files are not detected by the following code. How to detect these new files? val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: kafka + Spark Streaming with checkPointing fails to restart
The data pipeline (DAG) should not be added to the StreamingContext in the case of a recovery scenario. The pipeline metadata is recovered from the checkpoint folder. That is one thing you will need to fix in your code. Also, I don't think the ssc.checkpoint(folder) call should be made in case of the recovery. The idiom to follow is to set up the DAG in the creatingFunc and not outside of it. This will ensure that if a new context is being created i.e. checkpoint folder does not exist, the DAG will get added to it and then checkpointed. Once a recovery happens, this function is not invoked but everything is recreated from the checkpointed data. Hope this helps, NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864p22878.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql hive-shims
Ultimately it was PermGen out of memory. I somehow missed it in the log On Thu, May 14, 2015 at 9:24 AM, Lior Chaga lio...@taboola.com wrote: After profiling with YourKit, I see there's an OutOfMemoryException in context SQLContext.applySchema. Again, it's a very small RDD. Each executor has 180GB RAM. On Thu, May 14, 2015 at 8:53 AM, Lior Chaga lio...@taboola.com wrote: Hi, Using spark sql with HiveContext. Spark version is 1.3.1 When running local spark everything works fine. When running on spark cluster I get ClassNotFoundError org.apache.hadoop.hive.shims.Hadoop23Shims. This class belongs to hive-shims-0.23, and is a runtime dependency for spark-hive: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.1 [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime My spark distribution is: make-distribution.sh --tgz -Phive -Phive-thriftserver -DskipTests If I try to add this dependency to my driver project, then the exception disappears, but then the task is stuck when registering an rdd as a table (I get timeout after 30 seconds). I should emphasize that the first rdd I register as a table is a very small one (about 60K row), and as I said - it runs swiftly in local. I suspect maybe other dependencies are missing, but they fail silently. Would be grateful if anyone knows how to solve it. Lior
Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?
Did you happened to have a look at the spark job server? https://github.com/ooyala/spark-jobserver Someone wrote a python wrapper https://github.com/wangqiang8511/spark_job_manager around it, give it a try. Thanks Best Regards On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a* pyspark example*? Thanks Regards, Meethu M
Re: Spark SQL: preferred syntax for column reference?
Depends on which compile time you are talking about. *scala compile time*: No, the information about which columns are available is usually coming from a file or an external database which may or may not be available to scalac. *query compile time*: While your program is running, but before any spark jobs are launched, the query is analyzed and exceptions will be thrown if either syntax is used to reference an invalid column. There was a prototype implementation https://github.com/marmbrus/sql-typed that invoked the Spark SQL analyzer from within the Scala compiler using macros, which gave true compile time checking. However, this required scalac to have access to the Hive metastore or the files being scanned, which is often difficult to achieve in practice. Additionally, since you can construct DataFrames using arbitrary scala code, even with proper configuration it is not always possible to figure out when a reference is valid, unless you have actually run the code that constructs the DataFrame you are referencing. Michael On Wed, May 13, 2015 at 7:43 PM, Dean Wampler deanwamp...@gmail.com wrote: Is the $foo or mydf(foo) or both checked at compile time to verify that the column reference is valid? Thx. Dean On Wednesday, May 13, 2015, Michael Armbrust mich...@databricks.com wrote: I would not say that either method is preferred (neither is old/deprecated). One advantage to the second is that you are referencing a column from a specific dataframe, instead of just providing a string that will be resolved much like an identifier in a SQL query. This means given: df1 = [id: int, name: string ] df2 = [id: int, zip: int] I can do something like: df1.join(df2, df1(id) === df2(id)) Where as I would need aliases if I was only using strings: df1.as(a).join(df2.as(b), $a.id === $b.id) On Wed, May 13, 2015 at 9:55 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm just getting started with Spark SQL and DataFrames in 1.3.0. I notice that the Spark API shows a different syntax for referencing columns in a dataframe than the Spark SQL Programming Guide. For instance, the API docs for the select method show this: df.select($colA, $colB) Whereas the programming guide shows this: df.filter(df(name) 21).show() I tested and both the $column and df(column) syntax works, but I'm wondering which is *preferred*. Is one the original and one a new feature we should be using? Thanks, Diana (Spark Curriculum Developer for Cloudera) -- Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: spark-streaming whit flume error
Can you share the client code that you used to send the data? May be this discussion would give you some insights http://apache-avro.679487.n3.nabble.com/Avro-RPC-Python-to-Java-isn-t-working-for-me-td4027454.html Thanks Best Regards On Thu, May 14, 2015 at 8:44 AM, 鹰 980548...@qq.com wrote: Hi all, I want use spark-streaming with flume ,now i am in truble, I don't know how to configure the flume ,I use I configure flume like this : a1.sources = r1 a1.channels = c1 c2 a1.sources.r1.type = avro a1.sources.r1.channels = c1 c2 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4 a1.sources.r1.selector.type=replicating a1.channels.c1.type = memory a1.channels.c1.capacity = 1 a1.channels.c1.transactionCapacity = 1 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 80 a1.channels.c2.type = memory a1.channels.c2.capacity = 1 a1.channels.c2.transactionCapacity = 1 a1.channels.c2.byteCapacityBufferPercentage = 20 a1.channels.c2.byteCapacity = 80 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /user/hxf/flume a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=0 a1.channels = c1 but when I send data to the 4 port I get an error like this : org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 154218761 items! Connection closed. dose anybody can help me? thanks!
RE: how to read lz4 compressed data using fileStream of spark streaming?
How do I unsubscribe from this mailing list please? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page for contact information on our offices worldwide.
swap tuple
Hi, I have *JavaPairRDDString, String *and I want to *swap tuple._1() to tuple._2()*. I use *tuple.swap() *but it can't be changed JavaPairRDD in real. When I print JavaPairRDD, the values are same. Anyone can help me for that? Thank you. Have nice day. yasemin -- hiç ender hiç
Build change PSA: Hadoop 2.2 default; -Phadoop-x.y profile recommended for builds
This change will be merged shortly for Spark 1.4, and has a minor implication for those creating their own Spark builds: https://issues.apache.org/jira/browse/SPARK-7249 https://github.com/apache/spark/pull/5786 The default Hadoop dependency has actually been Hadoop 2.2 for some time, but the defaults weren't fully consistent as a Hadoop 2.2 build. That is what this resolves. The discussion highlights that it's actually not great to rely on the Hadoop defaults, if you care at all about the Hadoop binding, and that it's good practice to set some -Phadoop-x.y profile in any build. The net changes are: If you don't care about Hadoop at all, you could ignore this. You will get a consistent Hadoop 2.2 binding by default now. Still, you may wish to set a Hadoop profile. If you build for Hadoop 1, you need to set -Phadoop-1 now. If you build for Hadoop 2.2, you should still set -Phadoop-2.2 even though this is the default and is a no-op profile now. You can continue to set other Hadoop profiles and override hadoop.version; these are unaffected. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
It would be good if you can tell what I should add to the documentation to make it easier to understand. I can update the docs for 1.4.0 release. On Tue, May 12, 2015 at 9:52 AM, Lee McFadden splee...@gmail.com wrote: Thanks for explaining Sean and Cody, this makes sense now. I'd like to help improve this documentation so other python users don't run into the same thing, so I'll look into that today. On Tue, May 12, 2015 at 9:44 AM Cody Koeninger c...@koeninger.org wrote: One of the packages just contains the streaming-kafka code. The other contains that code, plus everything it depends on. That's what assembly typically means in JVM land. Java/Scala users are accustomed to using their own build tool to include necessary dependencies. JVM dependency management is (thankfully) different from Python dependency management. As far as I can tell, there is no core issue, upstream or otherwise. On Tue, May 12, 2015 at 11:39 AM, Lee McFadden splee...@gmail.com wrote: Thanks again for all the help folks. I can confirm that simply switching to `--packages org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes everything work as intended. I'm not sure what the difference is between the two packages honestly, or why one should be used over the other, but the documentation is currently not intuitive in this matter. If you follow the instructions, initially it will seem broken. Is there any reason why the docs for Python users (or, in fact, all users - Java/Scala users will run into this too except they are armed with the ability to build their own jar with the dependencies included) should not be changed to using the assembly package by default? Additionally, after a few google searches yesterday combined with your help I'm wondering if the core issue is upstream in Kafka's dependency chain? On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote: bq. it is already in the assembly Yes. Verified: $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep yammer | grep Gauge 1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote: It doesn't depend directly on yammer metrics; Kafka does. It wouldn't be correct to declare that it does; it is already in the assembly anyway. On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote: Currently external/kafka/pom.xml doesn't cite yammer metrics as dependency. $ ls -l ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar -rw-r--r-- 1 tyu staff 82123 Dec 17 2013 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar Including the metrics-core jar would not increase the size of the final release artifact much. My two cents.
Re: how to read lz4 compressed data using fileStream of spark streaming?
That's because you are using TextInputFormat i think, try with LzoTextInputFormat like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, com.hadoop.mapreduce.LzoTextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) Thanks Best Regards On Thu, May 14, 2015 at 1:04 PM, lisendong lisend...@163.com wrote: I have action on DStream. because when I put a text file into the hdfs, it runs normally, but if I put a lz4 file, it does nothing. 在 2015年5月14日,下午3:32,Akhil Das ak...@sigmoidanalytics.com 写道: What do you mean by not detected? may be you forgot to trigger some action on the stream to get it executed. Like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) *list_join_action_stream.count().print()* Thanks Best Regards On Wed, May 13, 2015 at 7:18 PM, hotdog lisend...@163.com wrote: in spark streaming, I want to use fileStream to monitor a directory. But the files in that directory are compressed using lz4. So the new lz4 files are not detected by the following code. How to detect these new files? val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unsubscribe
Have a look https://spark.apache.org/community.html Send an email to user-unsubscr...@spark.apache.org Thanks Best Regards On Thu, May 14, 2015 at 1:08 PM, Saurabh Agrawal saurabh.agra...@markit.com wrote: How do I unsubscribe from this mailing list please? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office -- This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page for contact information on our offices worldwide.
Re: Unsubscribe
Please see http://spark.apache.org/community.html Cheers On May 14, 2015, at 12:38 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: How do I unsubscribe from this mailing list please? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page for contact information on our offices worldwide.
[Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed
Hi, all, i got following error when i run unit test of spark by dev/run-tests on the latest branch-1.4 branch. the latest commit id: commit d518c0369fa412567855980c3f0f426cde5c190d Author: zsxwing zsxw...@gmail.com Date: Wed May 13 17:58:29 2015 -0700 error [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed: org.apache.spark.SparkException: Error communicating with MapOutputTracker [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [error] at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119) [error] at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324) [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93) [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala) [error] at org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103) [error] ... [error] Caused by: org.apache.spark.SparkException: Error sending message [message = StopMapOutputTracker] [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109) [error] ... 52 more [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] [error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) [error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) [error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) [error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) [error] at scala.concurrent.Await$.result(package.scala:107) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) [error] ... 54 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to read lz4 compressed data using fileStream of spark streaming?
Here's https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java the class. You can read more here https://github.com/twitter/hadoop-lzo#maven-repository Thanks Best Regards On Thu, May 14, 2015 at 1:22 PM, lisendong lisend...@163.com wrote: LzoTextInputFormat where is this class? what is the maven dependency? 在 2015年5月14日,下午3:40,Akhil Das ak...@sigmoidanalytics.com 写道: That's because you are using TextInputFormat i think, try with LzoTextInputFormat like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, com.hadoop.mapreduce.LzoTextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) Thanks Best Regards On Thu, May 14, 2015 at 1:04 PM, lisendong lisend...@163.com wrote: I have action on DStream. because when I put a text file into the hdfs, it runs normally, but if I put a lz4 file, it does nothing. 在 2015年5月14日,下午3:32,Akhil Das ak...@sigmoidanalytics.com 写道: What do you mean by not detected? may be you forgot to trigger some action on the stream to get it executed. Like: val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) *list_join_action_stream.count().print()* Thanks Best Regards On Wed, May 13, 2015 at 7:18 PM, hotdog lisend...@163.com wrote: in spark streaming, I want to use fileStream to monitor a directory. But the files in that directory are compressed using lz4. So the new lz4 files are not detected by the following code. How to detect these new files? val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The explanation of input text format using LDA in Spark
hi keegan, Thanks a lot. Now I know the column represents all the words without repetition in all documents. I don't know what determine the order of the words, is there any difference when the column words with the different order? Thanks. Cui xp -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-explanation-of-input-text-format-using-LDA-in-Spark-tp22781p22880.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: kafka + Spark Streaming with checkPointing fails to restart
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Thanks everyone, that was the problem. the create new streaming context function was supposed to setup the stream processing as well as the checkpoint directory. I had missed the whole process of checkpoint setup. With that done, everything works as expected. For the benefit of others, my final version of the code that works looks like this and it works correctly: object RawLogProcessor extends Logging { import TacomaHelper._ val checkpointDir = /tmp/checkpointDir_tacoma var ssc: Option[StreamingContext] = None def createSparkConf(config: Config): SparkConf = { val sparkConf = new SparkConf() config.entrySet.asScala .map(kv = kv.getKey - kv.getValue) .foreach { case (k, v) = sparkConf.set(sspark.$k, unquote(v.render())) } sparkConf.registerKryoClasses(Array(classOf[VideoView], classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression])) sparkConf } // a function that returns a function of type: `() = StreamingContext ` def createContext(sparkConfig: Config, kafkaConf: Config)(f: StreamingContext = StreamingContext) = () = { val batchDurationSecs = sparkConfig.getDuration(streaming.batch_duration, TimeUnit.SECONDS) val sparkConf = createSparkConf(sparkConfig) // calculate sparkContext and streamingContext val streamingContext = new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs)) streamingContext.checkpoint(checkpointDir) // apply the streaming context function to the function f(streamingContext) } def createNewContext(sparkConf: Config, kafkaConf: Config, f: StreamingContext = StreamingContext) = { logInfo(Create new Spark streamingContext with provided pipeline function) StreamingContext.getOrCreate( checkpointPath = checkpointDir, creatingFunc = createContext(sparkConf, kafkaConf)(f), createOnError = true) } def apply(sparkConfig: Config, kafkaConf: Config): StreamingContext = { rawlogTopic = kafkaConf.getString(rawlog.topic) kafkaParams = kafkaConf.entrySet.asScala .map(kv = kv.getKey - unquote(kv.getValue.render())) .toMap if (ssc.isEmpty) { ssc = Some(createNewContext(sparkConfig, kafkaConf, setupPipeline) ) } ssc.get } var rawlogTopic: String = qa-rawlog var kafkaParams: Map[String, String] = Map() def setupPipeline(streamingContext: StreamingContext): StreamingContext = { logInfo(Creating new kafka rawlog stream) // TODO: extract this and pass it around somehow val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, Set(rawlogTopic)) logInfo(adding step to parse kafka stream into RawLog types (Normalizer)) val eventStream = rawlogDStream .map({ case (key, rawlogVal) = val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get(id).asInstanceOf[String]) .setAccount(record.get(account).asInstanceOf[String]) .setEvent(record.get(event).asInstanceOf[String]) .setTimestamp(record.get(timestamp).asInstanceOf[Long]) .setUserAgent(record.get(user_agent).asInstanceOf[String]) .setParams(record.get(params).asInstanceOf[java.util.Map[String, String]]) .build() val norm = Normalizer(rlog) (key, rlog.getEvent, norm) }) logInfo(Adding step to filter out VideoView only events and cache them) val videoViewStream = eventStream .filter(_._2 == video_view) .filter(_._3.isDefined) .map((z) = (z._1, z._3.get)) .map((z) = (z._1, z._2.asInstanceOf[VideoView])) .cache() // repartition by account logInfo(repartition videoView by account and calculate stats) videoViewStream.map((v) = (v._2.getAccount, 1)) .filter(_._1 != null) .window(Durations.seconds(20)) .reduceByKey(_ + _) .print() // repartition by (deviceType, DeviceOS) logInfo(repartition videoView by (DeviceType, DeviceOS) and calculate stats) videoViewStream.map((v) = ((v._2.getDeviceType, v._2.getDeviceOs), 1)) .reduceByKeyAndWindow(_ + _, Durations.seconds(10)) .print() streamingContext } } - - Ankur On 13/05/2015 23:52, NB wrote: The data pipeline (DAG) should not be added to the StreamingContext in the case of a recovery scenario. The pipeline metadata is recovered from the checkpoint folder. That is one thing you will need to fix in your code. Also, I don't think the ssc.checkpoint(folder) call should be made in case of the recovery. The idiom to follow is to set up the DAG in the creatingFunc and not outside of it. This will ensure that if a new context is being created i.e. checkpoint folder does not exist, the DAG will get added to it and then checkpointed. Once a recovery happens, this function is not invoked but everything is recreated from the
RE: [Spark SQL 1.3.1] data frame saveAsTable returns exception
Hi Michael Ayan, Thank you for your response to my problem. Michael do we have a tentative release date for Spark version 1.4? Regards, Ishwardeep From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Wednesday, May 13, 2015 10:54 PM To: ayan guha Cc: Ishwardeep Singh; user Subject: Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception I think this is a bug in our date handling that should be fixed in Spark 1.4. On Wed, May 13, 2015 at 8:23 AM, ayan guha guha.a...@gmail.commailto:guha.a...@gmail.com wrote: Your stack trace says it can't convert date to integer. You sure about column positions? On 13 May 2015 21:32, Ishwardeep Singh ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote: Hi , I am using Spark SQL 1.3.1. I have created a dataFrame using jdbc data source and am using saveAsTable() method but got the following 2 exceptions: java.lang.RuntimeException: Unsupported datatype DecimalType() at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015) java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at
Re: how to set random seed
Sorry for late reply. Here is what I was thinking import random as r def main(): get SparkContext #Just for fun, lets assume seed is an id filename=bin.dat seed = id(filename) #broadcast it br = sc.broadcast(seed) #set up dummy list lst = [] for i in range(4): x=[] for j in range(4): x.append(j) lst.append(x) print lst base = sc.parallelize(lst) print base.map(randomize).collect() Randomize looks like def randomize(lst): local_seed = br.value r.seed(local_seed) r.shuffle(lst) return lst Let me know if this helps... base = sc.parallelize(lst) print base.map(randomize).collect() On Wed, May 13, 2015 at 11:41 PM, Charles Hayden charles.hay...@atigeo.com wrote: Can you elaborate? Broadcast will distribute the seed, which is only one number. But what construct do I use to plant the seed (call random.seed()) once on each worker? -- *From:* ayan guha guha.a...@gmail.com *Sent:* Tuesday, May 12, 2015 11:17 PM *To:* Charles Hayden *Cc:* user *Subject:* Re: how to set random seed Easiest way is to broadcast it. On 13 May 2015 10:40, Charles Hayden charles.hay...@atigeo.com wrote: In pySpark, I am writing a map with a lambda that calls random.shuffle. For testing, I want to be able to give it a seed, so that successive runs will produce the same shuffle. I am looking for a way to set this same random seed once on each worker. Is there any simple way to do it? -- Best Regards, Ayan Guha
how to delete data from table in sparksql
Hi guys i got to delete some data from a table by delete from table where name = xxx, however delete is not functioning like the DML operation in hive. I got a info like below:Usage: delete [FILE|JAR|ARCHIVE] value [value]* 15/05/14 18:18:24 ERROR processors.DeleteResourceProcessor: Usage: delete [FILE|JAR|ARCHIVE] value [value]* I checked the list of Supported Hive Features , but not found if this dml is supported. So any comments will be appreciated. Thanksamp;Best regards! San.Luo
Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed
Do you get this failure repeatedly? On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.com wrote: Hi, all, i got following error when i run unit test of spark by dev/run-tests on the latest branch-1.4 branch. the latest commit id: commit d518c0369fa412567855980c3f0f426cde5c190d Author: zsxwing zsxw...@gmail.com Date: Wed May 13 17:58:29 2015 -0700 error [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed: org.apache.spark.SparkException: Error communicating with MapOutputTracker [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [error] at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119) [error] at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324) [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93) [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala) [error] at org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103) [error] ... [error] Caused by: org.apache.spark.SparkException: Error sending message [message = StopMapOutputTracker] [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109) [error] ... 52 more [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] [error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) [error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) [error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) [error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) [error] at scala.concurrent.Await$.result(package.scala:107) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) [error] ... 54 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Word2Vec with billion-word corpora
Hi all, I'm experimenting with Spark's Word2Vec implementation for a relatively large (5B words, vocabulary size 4M, 400-dimensional vectors) corpora. Has anybody had success running it at this scale? Thanks in advance for your guidance! -Shilad -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Word2Vec-with-billion-word-corpora-tp22895.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
question about sparksql caching
Hi all, We are planing to use SparkSQL in a DW system. There’s a question about the caching mechanism of SparkSQL. For example, if I have a SQL like sqlContext.sql(“select c1, sum(c2) from T1, T2 where T1.key=T2.key group by c1”).cache() Is it going to cache the final result or the raw data of each table that used in the SQL? Since the user may have various of SQLs that use those tables, if the caching is for the final result only, it may still take very long time to scan the entire table if it’s a brand new SQL. If this is the case, is there any other better way to cache the base tables instead of final result? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
回复:textFileStream Question
file timestamp -- 原始邮件 -- 发件人: Vadim Bichutskiy;vadim.bichuts...@gmail.com; 发送时间: 2015年5月15日(星期五) 凌晨4:55 收件人: user@spark.apache.orguser@spark.apache.org; 主题: textFileStream Question How does textFileStream work behind the scenes? How does Spark Streaming know what files are new and need to be processed? Is it based on time stamp, file name? Thanks, Vadim ᐧ
question about sparksql caching
Hi all, We are planing to use SparkSQL in a DW system. There’s a question about the caching mechanism of SparkSQL. For example, if I have a SQL like sqlContext.sql(“select c1, sum(c2) from T1, T2 where T1.key=T2.key group by c1”).cache() Is it going to cache the final result or the raw data of each table that used in the SQL? Since the user may have various of SQLs that use those tables, if the caching is for the final result only, it may still take very long time to scan the entire table if it’s a brand new SQL. If this is the case, is there any other better way to cache the base tables instead of final result? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
another option (not really recommended, but worth mentioning) would be to change the region of dynamodb to be separate from the other stream - and even separate from the stream itself. this isn't available right now, but will be in Spark 1.4. On May 14, 2015, at 6:47 PM, Erich Ess er...@simplerelevance.com wrote: Hi Tathagata, I think that's exactly what's happening. The error message is: com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49550673839151225431779125105915140284622031848663416866 used in GetShardIterator on shard shardId-0002 in stream erich-test under account xxx is invalid because it did not come from this stream. I looked at the DynamoDB table and each job has single table and that table does not contain any stream identification information, only shard checkpointing data. I think the error is that when it tries to read from stream B, it's using checkpointing data for stream A and errors out. So it appears, at first glance, that currently you can't read from multiple Kinesis streams in a single job. I haven't tried this, but it might be possible for this to work if I force each stream to have different shard IDs so there is no ambiguity in the DynamoDB table; however, that's clearly not a feasible production solution. Thanks, -Erich On Thu, May 14, 2015 at 8:34 PM, Tathagata Das t...@databricks.com wrote: A possible problem may be that the kinesis stream in 1.3 uses the SparkContext app name, as the Kinesis Application Name, that is used by the Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis DStreams are using the Kinesis application name (as they are in the same StreamingContext / SparkContext / Spark app name), KCL may be doing weird overwriting checkpoint information of both Kinesis streams into the same DynamoDB table. Either ways, this is going to be fixed in Spark 1.4. On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote: have you tried to union the 2 streams per the KinesisWordCountASL example where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Erich Ess | CTO c. 310-703-6058 @SimpleRelevance | 130 E Randolph, Ste 1650 | Chicago, IL 60601 Machine Learning For Marketers Named a top startup to watch in Crain's — View the Article. SimpleRelevance.com | Facebook | Twitter | Blog