Re: RDD generated on every query
You can use a tachyon based storage for that and everytime the client queries, you just get it from there. Thanks Best Regards On Mon, Apr 6, 2015 at 6:01 PM, Siddharth Ubale siddharth.ub...@syncoms.com wrote: Hi , In Spark Web Application the RDD is generating every time client is sending a query request. Is there any way where the RDD is compiled once and run query again and again on active SparkContext? Thanks, Siddharth Ubale, *Synchronized Communications * *#43, Velankani Tech Park, Block No. II, * *3rd Floor, Electronic City Phase I,* *Bangalore – 560 100* *Tel : +91 80 3202 4060* *Web:* *www.syncoms.com* http://www.syncoms.com/ *[image: LogoNEWmohLARGE]* *London*|*Bangalore*|*Orlando* *we innovate, plan, execute, and transform the business*
Re: set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?
You could try leaving all the configuration values to default and running your application and see if you are still hitting the heap issue, If so try adding a Swap space to the machines which will definitely help. Another way would be to set the heap space manually (export _JAVA_OPTIONS=-Xmx5g) Thanks Best Regards On Wed, Apr 8, 2015 at 12:45 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am a bit confused on spark.storage.memoryFraction, this is used to set the area for RDD usage, will this RDD means only for cached and persisted RDD? So if my program has no cached RDD at all (means that I have no .cache() or .persist() call on any RDD), then I can set this spark.storage.memoryFraction to a very small number or even zero? I am writing a program which consume a lot of memory (broadcast value, runtime, etc). But I have no cached RDD, so should I just turn off this spark.storage.memoryFraction to 0 (which will help me to improve the performance)? And I have another issue on the broadcast, when I try to get a broadcast value, it throws me out of memory error, which part of memory should I allocate more (if I can’t increase my overall memory size). java.lang.OutOfMemoryError: Java heap spac e at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:218) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:200) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo Serializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser ializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2 48) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13 6) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5 49) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo ck$1.apply(TorrentBroadcast.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren tBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent Broadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s cala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast .scala:87) Regards, Shuai
Re: Seeing message about receiver not being de-registered on invoking Streaming context stop
When you say done fetching documents, does it mean that you are stopping the streamingContext? (ssc.stop) or you meant completed fetching documents for a batch? If possible, you could paste your custom receiver code so that we can have a look at it. Thanks Best Regards On Tue, Apr 7, 2015 at 8:46 AM, Hari Polisetty hpoli...@icloud.com wrote: My application is running Spark in local mode and I have a Spark Streaming Listener as well as a Custom Receiver. When the receiver is done fetching all documents, it invokes “stop” on itself. I see the StreamingListener getting a callback on “onReceiverStopped” where I stop the streaming context. However, I see the following message in my logs: 2015-04-06 16:41:51,193 WARN [Thread-66] com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop - Stopped receiver 2015-04-06 16:41:51,193 ERROR [sparkDriver-akka.actor.default-dispatcher-17] org.apache.spark.Logging$class.logError - Deregistered receiver for stream 0: AlHURLEY 2015-04-06 16:41:51,202 WARN [Executor task launch worker-2] org.apache.spark.Logging$class.logWarning - Stopped executor without error 2015-04-06 16:41:51,203 WARN [StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY)) What am I missing or doing wrong?
Re: start-slave.sh not starting
Why are you not using sbin/start-all.sh? Thanks Best Regards On Wed, Apr 8, 2015 at 10:24 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to start the worker by: sbin/start-slave.sh spark://ip-10-241-251-232:7077 In the logs it's complaining about: Master must be a URL of the form spark://hostname:port I also have this in spark-defaults.conf spark.master spark://ip-10-241-251-232:7077 Did I miss anything?
Re: RDD generated on every query
Hi, If you have the same spark context, then you can cache the query result via caching the table ( sqlContext.cacheTable(tableName) ). Maybe you can have a look at OOyola server also. On Tue, Apr 14, 2015 at 11:36 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use a tachyon based storage for that and everytime the client queries, you just get it from there. Thanks Best Regards On Mon, Apr 6, 2015 at 6:01 PM, Siddharth Ubale siddharth.ub...@syncoms.com wrote: Hi , In Spark Web Application the RDD is generating every time client is sending a query request. Is there any way where the RDD is compiled once and run query again and again on active SparkContext? Thanks, Siddharth Ubale, *Synchronized Communications * *#43, Velankani Tech Park, Block No. II, * *3rd Floor, Electronic City Phase I,* *Bangalore – 560 100* *Tel : +91 80 3202 4060* *Web:* *www.syncoms.com* http://www.syncoms.com/ *[image: LogoNEWmohLARGE]* *London*|*Bangalore*|*Orlando* *we innovate, plan, execute, and transform the business*
Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
One hack you can put in would be to bring Result class http://grepcode.com/file_/repository.cloudera.com/content/repositories/releases/com.cloudera.hbase/hbase/0.89.20100924-28/org/apache/hadoop/hbase/client/Result.java/?v=source locally and serialize it (implements serializable) and use it. Thanks Best Regards On Tue, Apr 7, 2015 at 12:07 AM, Jeetendra Gangele gangele...@gmail.com wrote: I hit again same issue This time I tried to return the Object it failed with task not serialized below is the code here vendor record is serializable private static JavaRDDVendorRecord getVendorDataToProcess(JavaSparkContext sc) throws IOException { return sc .newAPIHadoopRDD(getVendorDataRowKeyScannerConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, Result.class) .map(new FunctionTuple2ImmutableBytesWritable, Result, VendorRecord() { @Override public VendorRecord call(Tuple2ImmutableBytesWritable, Result v1) throws Exception { String rowKey = new String(v1._1.get()); VendorRecord vd=vendorDataDAO.getVendorDataForRowkey(rowKey); return vd; } }); } On 1 April 2015 at 02:07, Ted Yu yuzhih...@gmail.com wrote: Jeetendra: Please extract the information you need from Result and return the extracted portion - instead of returning Result itself. Cheers On Tue, Mar 31, 2015 at 1:14 PM, Nan Zhu zhunanmcg...@gmail.com wrote: The example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala might help Best, -- Nan Zhu http://codingcat.me On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote: Yep, it's not serializable: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html You can't return this from a distributed operation since that would mean it has to travel over the network and you haven't supplied any way to convert the thing into bytes. On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com wrote: When I am trying to get the result from Hbase and running mapToPair function of RRD its giving the error java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result Here is the code // private static JavaPairRDDInteger, Result getCompanyDataRDD(JavaSparkContext sc) throws IOException { // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, // Result.class).mapToPair(new PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() { // // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable, Result t) throws Exception { // System.out.println(In getCompanyDataRDD+t._2); // // String cknid = Bytes.toString(t._1.get()); // System.out.println(processing cknids is:+cknid); // Integer cknidInt = Integer.parseInt(cknid); // Tuple2Integer, Result returnTuple = new Tuple2Integer, Result(cknidInt, t._2); // return returnTuple; // } // }); // } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)]
Just make sure you import the followings: import org.apache.spark.SparkContext._ import org.apache.spark.StreamingContext._ Thanks Best Regards On Wed, Apr 8, 2015 at 6:38 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am trying to implement this example (Spark Streaming with Twitter). https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala I am able to do: hashTags.print() to get a live stream of filtered hashtags, but I get these warnings, not sure if they're related to the error: WARN BlockManager: Block input-0-1428450594600 replicated to only 0 peer(s) instead of 1 peers then when I try to print out topCounts60 or topCounts10, I get this error when building: /home/ec2-user/sparkApps/TwitterApp/src/main/scala/TwitterPopularTags.scala:35: error: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)] [INFO] val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) = (count, topic)}.transform(_.sortByKey(false)) Thank you for the help! Best, Su - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Expected behavior for DataFrame.unionAll
I think what happened was applying the narrowest possible type. Type widening is required, and as a result, the narrowest type is string between a string and an int. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144 On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin
Re: set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?
Hi, In one of the application we have made which had no clone stuff, we have set the value of spark.storage.memoryFraction to very low, and yes that gave us performance benefits. Regarding that issue, you should also look at the data you are trying to broadcast, as sometimes creating that data structure at executor's itself as singleton helps. Thanks, On Tue, Apr 14, 2015 at 12:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try leaving all the configuration values to default and running your application and see if you are still hitting the heap issue, If so try adding a Swap space to the machines which will definitely help. Another way would be to set the heap space manually (export _JAVA_OPTIONS=-Xmx5g) Thanks Best Regards On Wed, Apr 8, 2015 at 12:45 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am a bit confused on spark.storage.memoryFraction, this is used to set the area for RDD usage, will this RDD means only for cached and persisted RDD? So if my program has no cached RDD at all (means that I have no .cache() or .persist() call on any RDD), then I can set this spark.storage.memoryFraction to a very small number or even zero? I am writing a program which consume a lot of memory (broadcast value, runtime, etc). But I have no cached RDD, so should I just turn off this spark.storage.memoryFraction to 0 (which will help me to improve the performance)? And I have another issue on the broadcast, when I try to get a broadcast value, it throws me out of memory error, which part of memory should I allocate more (if I can’t increase my overall memory size). java.lang.OutOfMemoryError: Java heap spac e at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:218) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:200) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo Serializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser ializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2 48) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13 6) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5 49) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo ck$1.apply(TorrentBroadcast.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren tBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent Broadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s cala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast .scala:87) Regards, Shuai
Re: Cannot change the memory of workers
If you want to use 2g of memory on each worker, you can simply export SPARK_WORKER_MEMORY=2g inside your spark-env.sh on all machine in the cluster. Thanks Best Regards On Wed, Apr 8, 2015 at 7:27 AM, Jia Yu jia...@asu.edu wrote: Hi guys, Currently I am running Spark program on Amazon EC2. Each worker has around (less than but near to )2 gb memory. By default, I can see each worker is allocated 976 mb memory as the table shows below on Spark WEB UI. I know this value is from (Total memory minus 1 GB). But I want more than 1 gb in each of my worker. AddressStateCoresMemory ALIVE1 (0 Used)976.0 MB (0.0 B Used)Based on the instruction on Spark website, I made export SPARK_WORKER_MEMORY=1g in spark-env.sh. But it doesn't work. BTW, I can set SPARK_EXECUTOR_MEMORY=1g and it works. Can anyone help me? Is there a requirement that one worker must maintain 1 gb memory for itself aside from the memory for Spark? Thanks, Jia
Re: Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] when create context
Can you share a bit more information on the type of application that you are running? From the stacktrace i can only say, for some reason your connection timedout (prolly a GC pause or network issue) Thanks Best Regards On Wed, Apr 8, 2015 at 9:48 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, In some cases, I have below exception when I run spark in local mode (I haven’t see this in a cluster). This is weird but also affect my local unit test case (it is not always happen, but usually one per 4-5 times run). From the stack, looks like error happen when create the context, but I don’t know why and what kind of parameters that I can set to solve this issue. Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223 ) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockConte xt.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala: 184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doC reateActorSystem(AkkaUtils.scala:122) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$ sp(Utils.scala:1832) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57 ) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at org.apache.spark.SparkContext.init(SparkContext.scala:270) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc ala:61) at com.***.executor.FinancialEngineExecutor.run(F inancialEngineExecutor.java:110) Regards, Shuai
Running Spark on Gateway - Connecting to Resource Manager Retries
Hi Team, I am running Spark Word Count example( https://github.com/sryza/simplesparkapp), if I go with master as local it works fine. But when I change the master to yarn its end with retries connecting to resource manager(stack trace mentioned below), 15/04/14 11:31:57 INFO RMProxy: Connecting to ResourceManager at / 0.0.0.0:8032 15/04/14 11:31:58 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/14 11:31:59 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) If I run the same command from Namenode instance it ends with ArrayOutofBoundException(Stack trace mentioned below), 15/04/14 11:38:44 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 1 at com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:28) at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Looking forward to get it resolve to work on respective nodes. Thanks,
Re: [Spark1.3] UDF registration issue
You can do this: strLen = udf((s: String) = s.length()) cleanProcessDF.withColumn(dii,strLen(col(di))) (You might need to play with the type signature a little bit to get it to compile) On Fri, Apr 10, 2015 at 11:30 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, I'm running into some trouble trying to register a UDF: scala sqlContext.udf.register(strLen, (s: String) = s.length()) res22: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(function1,IntegerType) scala cleanProcessDF.withColumn(dii,strLen(col(di))) console:33: error: not found: value strLen cleanProcessDF.withColumn(dii,strLen(col(di))) Where cleanProcessDF is a dataframe Is my syntax wrong? Or am I missing an import of some sort?
Re: SparkSQL + Parquet performance
That totally depends on your disk IO and the number of CPUs that you have in the cluster. For example, if you are having a disk IO of 100MB/s and a handful of CPUs ( say 40 cores, on 10 machines), then it could take you to ~ 1GB/Sec i believe. Thanks Best Regards On Tue, Apr 7, 2015 at 2:48 AM, Paolo Platter paolo.plat...@agilelab.it wrote: Hi all, is there anyone using SparkSQL + Parquet that has made a benchmark about storing parquet files on HDFS or on CFS ( Cassandra File System )? What storage can improve performance of SparkSQL+ Parquet ? Thanks Paolo
Re: Expected behavior for DataFrame.unionAll
That explains it. Thanks Reynold. Justin On Mon, Apr 13, 2015 at 11:26 PM, Reynold Xin r...@databricks.com wrote: I think what happened was applying the narrowest possible type. Type widening is required, and as a result, the narrowest type is string between a string and an int. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144 On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin
Re: streamSQL - is it available or is it in POC ?
We have a similar version (Sigstream), you could find more over here https://sigmoid.com/ Thanks Best Regards On Wed, Apr 8, 2015 at 9:25 AM, haopu hw...@qilinsoft.com wrote: I'm also interested in this project. Do you have any update on it? Is it still active? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streamSQL-is-it-available-or-is-it-in-POC-tp20993p22416.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: save as text file throwing null pointer error.
Where exactly is it throwing null pointer exception? Are you starting your program from another program or something? looks like you are invoking ProcessingBuilder etc. Thanks Best Regards On Thu, Apr 9, 2015 at 6:46 PM, Somnath Pandeya somnath_pand...@infosys.com wrote: JavaRDDString lineswithoutStopWords = nonEmptylines .map(*new* FunctionString, String() { /** * */ *private* *static* *final* *long* *serialVersionUID* = 1L; @Override *public* String call(String line) *throws* Exception { // *TODO* Auto-generated method stub *return* *removeStopWords*(line, stopwords); } }); lineswithoutStopWords.saveAsTextFile(output/testop.txt); Exception in task 0.0 in stage 1.0 (TID 1) *java.lang.NullPointerException* at java.lang.ProcessBuilder.start(*ProcessBuilder.java:1012*) at org.apache.hadoop.util.Shell.runCommand(*Shell.java:404*) at org.apache.hadoop.util.Shell.run(*Shell.java:379*) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute( *Shell.java:589*) at org.apache.hadoop.util.Shell.execCommand(*Shell.java:678*) at org.apache.hadoop.util.Shell.execCommand(*Shell.java:661*) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission( *RawLocalFileSystem.java:639*) at org.apache.hadoop.fs.FilterFileSystem.setPermission( *FilterFileSystem.java:468*) at org.apache.hadoop.fs.ChecksumFileSystem.create( *ChecksumFileSystem.java:456*) at org.apache.hadoop.fs.ChecksumFileSystem.create( *ChecksumFileSystem.java:424*) at org.apache.hadoop.fs.FileSystem.create(*FileSystem.java:905*) at org.apache.hadoop.fs.FileSystem.create(*FileSystem.java:798*) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter( *TextOutputFormat.java:123*) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker( *ThreadPoolExecutor.java:1142*) at java.util.concurrent.ThreadPoolExecutor$Worker.run( *ThreadPoolExecutor.java:617*) at java.lang.Thread.run(*Thread.java:745*) 15/04/09 18:44:36 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): *java.lang.NullPointerException* at java.lang.ProcessBuilder.start(*ProcessBuilder.java:1012*) at org.apache.hadoop.util.Shell.runCommand(*Shell.java:404*) at org.apache.hadoop.util.Shell.run(*Shell.java:379*) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute( *Shell.java:589*) at org.apache.hadoop.util.Shell.execCommand(*Shell.java:678*) at org.apache.hadoop.util.Shell.execCommand(*Shell.java:661*) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission( *RawLocalFileSystem.java:639*) at org.apache.hadoop.fs.FilterFileSystem.setPermission( *FilterFileSystem.java:468*) at org.apache.hadoop.fs.ChecksumFileSystem.create( *ChecksumFileSystem.java:456*) at org.apache.hadoop.fs.ChecksumFileSystem.create( *ChecksumFileSystem.java:424*) at org.apache.hadoop.fs.FileSystem.create(*FileSystem.java:905*) at org.apache.hadoop.fs.FileSystem.create(*FileSystem.java:798*) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter( *TextOutputFormat.java:123*) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker( *ThreadPoolExecutor.java:1142*) at java.util.concurrent.ThreadPoolExecutor$Worker.run( *ThreadPoolExecutor.java:617*) at java.lang.Thread.run(*Thread.java:745*) 15/04/09 18:44:36 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Cannot saveAsParquetFile from a RDD of case class
Hello, I tried to follow the tutorial of Spark SQL, but is not able to saveAsParquetFile from a RDD of case class. Here is my Main.scala and build.sbt https://gist.github.com/pishen/939cad3da612ec03249f At line 34, compiler said that value saveAsParquetFile is not a member of org.apache.spark.rdd.RDD[core.Log] Any suggestion on how to solve this? Thanks, pishen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-saveAsParquetFile-from-a-RDD-of-case-class-tp22488.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cannot saveAsParquetFile from a RDD of case class
OK, it do work. Maybe it will be better to update this usage in the official Spark SQL tutorial: http://spark.apache.org/docs/latest/sql-programming-guide.html Thanks, pishen 2015-04-14 15:30 GMT+08:00 fightf...@163.com fightf...@163.com: Hi,there If you want to use the saveAsParquetFile, you may want to use val log_df = sqlContext.createDataFrame(logs) And then you can issue log_df.saveAsParquetFile (path) Best, Sun. -- fightf...@163.com *From:* pishen pishe...@gmail.com *Date:* 2015-04-14 15:18 *To:* user user@spark.apache.org *Subject:* Cannot saveAsParquetFile from a RDD of case class Hello, I tried to follow the tutorial of Spark SQL, but is not able to saveAsParquetFile from a RDD of case class. Here is my Main.scala and build.sbt https://gist.github.com/pishen/939cad3da612ec03249f At line 34, compiler said that value saveAsParquetFile is not a member of org.apache.spark.rdd.RDD[core.Log] Any suggestion on how to solve this? Thanks, pishen -- View this message in context: Cannot saveAsParquetFile from a RDD of case class http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-saveAsParquetFile-from-a-RDD-of-case-class-tp22488.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Spark: Using node-local files within functions?
Hi, I am trying to use Spark in combination with Yarn with 3rd party code which is unaware of distributed file systems. Providing hdfs file references thus does not work. My idea to resolve this issue was the following: Within a function I take the HDFS file reference I get as parameter and copy it into the local file system and provide the 3rd party components what they expect. textFolder.map(new Function() { public List... call(String inputFile) throws Exception { //resolve, copy hdfs file to local file system //get local file pointer //this function should be executed on a node, right. There is probably a local file system) //call 3rd party library with 'local file' reference // do other stuff } } This seem to work, but I am not really sure if this might cause other problems when going to productive file sizes. E.g. the files I copy to the local file system might be large. Would this affect Yarn somehow? Are there more advisable ways to befriend HDFS-unaware libraries with HDFS file pointer? Regards,
Spark SQL reading parquet decimal
Hi guys I have parquet data written by Impala: Server version: impalad version 2.1.2-cdh5 RELEASE (build 36aad29cee85794ecc5225093c30b1e06ffb68d3) When using Spark SQL 1.3.0 (spark-assembly-1.3.0-hadoop2.4.0) i get the following error: val correlatedEventData = sqlCtx.sql( s |SELECT |id, |action_id, |adv_saleamount |FROM ir_correlated_event_t |.stripMargin) correlatedEventData.take(10).foreach(println) println(correlatedEventData count: + correlatedEventData.count) println(Decimal value: + correlatedEventData.first().getDecimal(2)) Exception in thread main java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to java.math.BigDecimal Neither does it work when I use, getFloat(2), getDouble(2), getAs[Float](2) or get(2).asInstanceOf[Float] etc etc Any assistance will be appreciated. Regards Clint
Re: Spark Streaming not picking current date properly
You can try something like this: eventsDStream.foreachRDD(rdd = { val curdate = new DateTime() val fmt = DateTimeFormat.forPattern(dd_MM_); rdd.saveAsTextFile(s3n://bucket_name/test/events_+fmt.print(curdate)+/events) }) Thanks Best Regards On Fri, Apr 10, 2015 at 4:22 PM, Anshul Singhle ans...@betaglide.com wrote: Hi all, I'm using spark streaming to log events from kinesis to s3. My code is doing something like this - val curdate = new DateTime() val fmt = DateTimeFormat.forPattern(dd_MM_); eventsDStream.saveAsTextFiles(s3n://bucket_name/test/events_+fmt.print(curdate)+/events,json) The problem is that when I started my streaming job on 9th April and kept it running for 1day , it was still writing to the 09_04_2015 folder. On restarting my streaming job, it was able to write to the correct directory i.e. 10_04_2015 Any idea why this is occurring? Regards, Anshul
Spark Data Formats ?
Can you please share the native support of data formats available with Spark. Two i can see are parquet and textFile sc.parquetFile sc.textFile I see that Hadoop Input Formats (Avro) are having issues, that i faced in earlier threads and seems to be well known. https://issues.apache.org/jira/browse/SPARK-993 https://issues.apache.org/jira/browse/SPARK-1018 Hence i want to know which data formats have full support in Spark. I can think of moving to those data formats. -- Deepak
Re: Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] when create context
This usually means something didn't start due to a fairly low-level error, like a class not found or incompatible Spark versions somewhere. At least, that's also what I see in unit tests when things like that go wrong. On Tue, Apr 14, 2015 at 8:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you share a bit more information on the type of application that you are running? From the stacktrace i can only say, for some reason your connection timedout (prolly a GC pause or network issue) Thanks Best Regards On Wed, Apr 8, 2015 at 9:48 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, In some cases, I have below exception when I run spark in local mode (I haven’t see this in a cluster). This is weird but also affect my local unit test case (it is not always happen, but usually one per 4-5 times run). From the stack, looks like error happen when create the context, but I don’t know why and what kind of parameters that I can set to solve this issue. Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223 ) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockConte xt.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala: 184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doC reateActorSystem(AkkaUtils.scala:122) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$ sp(Utils.scala:1832) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57 ) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at org.apache.spark.SparkContext.init(SparkContext.scala:270) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc ala:61) at com.***.executor.FinancialEngineExecutor.run(F inancialEngineExecutor.java:110) Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Data Formats ?
There's sc.objectFile also. Thanks Best Regards On Tue, Apr 14, 2015 at 2:59 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can you please share the native support of data formats available with Spark. Two i can see are parquet and textFile sc.parquetFile sc.textFile I see that Hadoop Input Formats (Avro) are having issues, that i faced in earlier threads and seems to be well known. https://issues.apache.org/jira/browse/SPARK-993 https://issues.apache.org/jira/browse/SPARK-1018 Hence i want to know which data formats have full support in Spark. I can think of moving to those data formats. -- Deepak
Clustering users according to their shopping traits
Sorry for off-topic, have not foud specific MLLib forum/ Please, advise a good overview of using clustering algorithms to group users according to user purchase and browsing history on a web site. Thanks!
Spark SQL reading parquet decimal
Hi guys I have parquet data written by Impala: Server version: impalad version 2.1.2-cdh5 RELEASE (build 36aad29cee85794ecc5225093c30b1e06ffb68d3) When using Spark SQL 1.3.0 (spark-assembly-1.3.0-hadoop2.4.0) i get the following error: val correlatedEventData = sqlCtx.sql( s |SELECT |id, |action_id, |adv_saleamount |FROM ir_correlated_event_t |.stripMargin) correlatedEventData.take(10).foreach(println) println(correlatedEventData count: + correlatedEventData.count) println(Decimal value: + correlatedEventData.first().getDecimal(2)) Exception in thread main java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to java.math.BigDecimal Neither does it work when I use, getFloat(2), getDouble(2), getAs[Float](2) or get(2).asInstanceOf[Float] etc etc Any assistance will be appreciated. Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-reading-parquet-decimal-tp22490.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Yarn-client Kerberos on remote cluster
Dear All, I would like to know if its possible to configure the SparkConf() in order to interact with a remote kerberized cluster in yarn-client mode. the spark will not be installed on the cluster itself and the localhost can't ask for a ticket, But a keytab as been generated in purpose and provide for the localhost. My purpose is to code in Eclipse on my localhost and submit my code in yarn-client mode on a HDP 2.2 fully kerberized. I actually work with spark on the cluster but its only a short term solution because at the end my localhost will be on windows 7. In advance, thank you for your answers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Yarn-client-Kerberos-on-remote-cluster-tp22491.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Increase partitions reading Parquet File
Hi. It doesn't work. val file = SqlContext.parquetfile(hdfs://node1/user/hive/warehouse/ file.parquet) file.repartition(127) println(h.partitions.size.toString()) -- Return 27! Regards On Fri, Apr 10, 2015 at 4:50 PM, Felix C felixcheun...@hotmail.com wrote: RDD.repartition(1000)? --- Original Message --- From: Masf masfwo...@gmail.com Sent: April 9, 2015 11:45 PM To: user@spark.apache.org Subject: Increase partitions reading Parquet File Hi I have this statement: val file = SqlContext.parquetfile(hdfs://node1/user/hive/warehouse/file.parquet) This code generates as many partitions as files are. So, I want to increase the number of partitions. I've tested coalesce (file.coalesce(100)) but the number of partitions doesn't change. How can I increase the number of partitions? Thanks -- Regards. Miguel Ángel -- Saludos. Miguel Ángel
Spark 1.2, trying to run spark-history as a service, spark-defaults.conf are ignored
Here is related problem: http://apache-spark-user-list.1001560.n3.nabble.com/Launching-history-server-problem-td12574.html but no answer. What I'm trying to do: wrap spark-history with /etc/init.d script Problems I have: can't make it read spark-defaults.conf I've put this file here: /etc/spark/conf /usr/lib/spark/conf where /usr/lib/spark is locaition for spark no luck. spark-history tries to use default value for applications log location, it doesn't read specified value from spark-defaults.conf
Re: Spark Yarn-client Kerberos on remote cluster
If your localhost can¹t talk to a KDC, you can¹t access a kerberized cluster. Only key tab file is not enough. -Neal On 4/14/15, 3:54 AM, philippe L lanckvrind.p@gmail.com wrote: Dear All, I would like to know if its possible to configure the SparkConf() in order to interact with a remote kerberized cluster in yarn-client mode. the spark will not be installed on the cluster itself and the localhost can't ask for a ticket, But a keytab as been generated in purpose and provide for the localhost. My purpose is to code in Eclipse on my localhost and submit my code in yarn-client mode on a HDP 2.2 fully kerberized. I actually work with spark on the cluster but its only a short term solution because at the end my localhost will be on windows 7. In advance, thank you for your answers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Yarn-client-Kerb eros-on-remote-cluster-tp22491.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.3.1 saveAsParquetFile will output tachyon file with different block size
Would you mind to open a JIRA for this? I think your suspicion makes sense. Will have a look at this tomorrow. Thanks for reporting! Cheng On 4/13/15 7:13 PM, zhangxiongfei wrote: Hi experts I run below code in Spark Shell to access parquet files in Tachyon. 1.First,created a DataFrame by loading a bunch of Parquet Files in Tachyon val ta3 =sqlContext.parquetFile(tachyon://tachyonserver:19998/apps/tachyon/zhangxf/parquetAdClick-6p-256m); 2.Second, set the fs.local.block.size to 256M to make sure that block size of output files in Tachyon is 256M. sc.hadoopConfiguration.setLong(fs.local.block.size,268435456) 3.Third,saved above DataFrame into Parquet files that is stored in Tachyon ta3.saveAsParquetFile(tachyon://tachyonserver:19998/apps/tachyon/zhangxf/parquetAdClick-6p-256m-test); After above code run successfully, the output parquet files were stored in Tachyon,but these files have different block size,below is the information of those files in the path tachyon://tachyonserver:19998/apps/tachyon/zhangxf/parquetAdClick-6p-256m-test: File Name Size Block Size In-Memory Pin Creation Time _SUCCESS 0.00 B 256.00 MB 100% NO 04-13-2015 17:48:23:519 _common_metadata 1088.00 B 256.00 MB 100% NO 04-13-2015 17:48:23:741 _metadata 22.71 KB 256.00 MB 100% NO 04-13-2015 17:48:23:646 part-r-1.parquet 177.19 MB 32.00 MB 100% NO 04-13-2015 17:46:44:626 part-r-2.parquet 177.21 MB 32.00 MB 100% NO 04-13-2015 17:46:44:636 part-r-3.parquet 177.02 MB 32.00 MB 100% NO 04-13-2015 17:46:45:439 part-r-4.parquet 177.21 MB 32.00 MB 100% NO 04-13-2015 17:46:44:845 part-r-5.parquet 177.40 MB 32.00 MB 100% NO 04-13-2015 17:46:44:638 part-r-6.parquet 177.33 MB 32.00 MB 100% NO 04-13-2015 17:46:44:648 It seems that the API saveAsParquetFile does not distribute/broadcast the hadoopconfiguration to executors like the other API such as saveAsTextFile.The configutation fs.local.block.size only take effects on Driver. If I set that configuration before loading parquet files,the problem is gone. Could anyone help me verify this problem? Thanks Zhang Xiongfei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned automatically. From the source code comments: // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com wrote: Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, April 02, 2015 12:14 PM To: user@spark.apache.org mailto:user@spark.apache.org Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk space in this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- exensa_logo_mail.png Guillaume PITEL, Président +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
That’s true, spill dirs don’t get cleaned up when something goes wrong. We are are restarting long running jobs once in a while for cleanups and have spark.cleaner.ttl set to a lower value than the default. On 14.04.2015, at 17:57, Guillaume Pitel guillaume.pi...@exensa.com wrote: Right, I remember now, the only problematic case is when things go bad and the cleaner is not executed. Also, it can be a problem when reusing the same sparkcontext for many runs. Guillaume It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned automatically. From the source code comments: // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, April 02, 2015 12:14 PM To: user@spark.apache.org mailto:user@spark.apache.org Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk space in this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- exensa_logo_mail.png Guillaume PITEL, Président +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- exensa_logo_mail.png Guillaume PITEL, Président +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Converting Date pattern in scala code
I need some help to convert the date pattern in my Scala code for Spark 1.3. I am reading the dates from two flat files having two different date formats. File 1: 2015-03-27 File 2: 02-OCT-12 09-MAR-13 This format of file 2 is not being recognized by my Spark SQL when I am comparing it in a WHERE clause on the date fields. Format of file 1 is being recognized better. How to convert the format in file 2 to match with the format in file 1? Regards Ananda attachment: winmail.dat - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark on Gateway - Connecting to Resource Manager Retries
Your Yarn access is not configured. 0.0.0.0:8032http://0.0.0.0:8032 this is default yarn address. I guess you don't have yarn-site.xml in your classpath. -Neal From: Vineet Mishra clearmido...@gmail.commailto:clearmido...@gmail.com Date: Tuesday, April 14, 2015 at 12:05 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, cdh-u...@cloudera.orgmailto:cdh-u...@cloudera.org cdh-u...@cloudera.orgmailto:cdh-u...@cloudera.org Subject: Running Spark on Gateway - Connecting to Resource Manager Retries Hi Team, I am running Spark Word Count example(https://github.com/sryza/simplesparkapp), if I go with master as local it works fine. But when I change the master to yarn its end with retries connecting to resource manager(stack trace mentioned below), 15/04/14 11:31:57 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8032http://0.0.0.0:8032 15/04/14 11:31:58 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032http://0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/14 11:31:59 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032http://0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) If I run the same command from Namenode instance it ends with ArrayOutofBoundException(Stack trace mentioned below), 15/04/14 11:38:44 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 1 at com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:28) at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Looking forward to get it resolve to work on respective nodes. Thanks,
Re: sbt-assembly spark-streaming-kinesis-asl error
Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging
Re: org.apache.spark.ml.recommendation.ALS
Yes, I think the default Spark builds are on Scala 2.10. You need to follow instructions at http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 to build 2.11 packages. -Xiangrui On Mon, Apr 13, 2015 at 4:00 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Here is the class: object ALSNew { def main (args: Array[String]) { val conf = new SparkConf() .setAppName(TrainingDataPurchase) .set(spark.executor.memory, 4g) conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2 conf.set(spark.storage.memoryFraction,0.3)//default is 0.6 val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() val rank = args(1).toInt val numIterations = args(2).toInt val regParam : Double = 0.01 val implicitPrefs : Boolean = true val numUserBlocks : Int = 100 val numItemBlocks : Int = 100 val nonnegative : Boolean = true //val paramMap = ParamMap (regParam=0.01) //paramMap.put(numUserBlocks=100, numItemBlocks=100) val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val alpha = als.getAlpha val model = als.fit(ratings) val predictions = model.transform(ratings) .select(rating, prediction) .map { case Row(rating: Float, prediction: Float) = (rating.toDouble, prediction.toDouble) } val rmse = if (implicitPrefs) { // TODO: Use a better (rank-based?) evaluation metric for implicit feedback. // We limit the ratings and the predictions to interval [0, 1] and compute the weighted RMSE // with the confidence scores as weights. val (totalWeight, weightedSumSq) = predictions.map { case (rating, prediction) = val confidence = 1.0 + alpha * math.abs(rating) val rating01 = math.max(math.min(rating, 1.0), 0.0) val prediction01 = math.max(math.min(prediction, 1.0), 0.0) val err = prediction01 - rating01 (confidence, confidence * err * err) }.reduce { case ((c0, e0), (c1, e1)) = (c0 + c1, e0 + e1) } math.sqrt(weightedSumSq /totalWeight) } else { val mse = predictions.map { case (rating, prediction) = val err = rating - prediction err * err }.mean() math.sqrt(mse) } println(Mean Squared Error = + rmse) } } I am using the following in my maven build (pom.xml): dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.11.2/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.3.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-mllib_2.11/artifactId version1.3.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.11/artifactId version1.3.0/version /dependency /dependencies I am using scala version 2.11.2. Could it be that spark-1.3.0-bin-hadoop2.4.tgz requires a different version of scala ? Thanks, Jay On Apr 9, 2015, at 4:38 PM, Xiangrui Meng men...@gmail.com wrote: Could you share ALSNew.scala? Which Scala version did you use? -Xiangrui On Wed, Apr 8, 2015 at 4:09 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, I tried running this on my local machine (laptop) and got the same error: Here is what I did: 1. downloaded spark 1.30 release version (prebuilt for hadoop 2.4 and later) spark-1.3.0-bin-hadoop2.4.tgz. 2. Ran the following command: spark-submit --class ALSNew --master local[8] ALSNew.jar /input_path The stack trace is exactly same. Thanks, Jay On Apr 8, 2015, at 10:47 AM, Jay Katukuri jkatuk...@apple.com wrote: some additional context: Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and used spark-submit from there. The cluster is still on spark-1.2.0. So, this looks to me that at runtime, the executors could not find some libraries of spark-1.3.0, even though I ran spark-submit from my downloaded spark-1.30. On Apr 6, 2015, at 1:37 PM, Jay Katukuri jkatuk...@apple.com wrote: Here is the command that I have used : spark-submit —class packagename.ALSNew --num-executors 100 --master yarn ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path Btw - I could run the old ALS in mllib package. On Apr 6, 2015, at 12:32 PM, Xiangrui Meng men...@gmail.com wrote:
Re: sbt-assembly spark-streaming-kinesis-asl error
Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at
Re: Regarding benefits of using more than one cpu for a task in spark
Hi twinkle, To be completely honest, I'm not sure, I had never heard spark.task.cpus before. But I could imagine two different use cases: a) instead of just relying on spark's creation of tasks for parallelism, a user wants to run multiple threads *within* a task. This is sort of going against the programming model of spark, but I guess this feature is meant to give you the bare minimum support you need in case you really want. Eg., maybe you have some existing library you want to use in each task which is already multi-threaded, or you pipe to some external programming. Or maybe you even do something custom yourself -- eg. you have some coordination between threads that spark doesn't give you between tasks. b) as a simple way to tune some resource management. Eg., you could initially have your cluster configured to overcount cores for hyperthreading, but then set spark.task.cpus to 2, if you don't want to count hyperthreading. Or perhaps you want to leave some cores open for all the other work going on -- GC, network IO, etc. (But then again, this is a strange setting to use for that -- you'd probably just want some fixed number of cores to count, not a multiplier.) On Tue, Apr 7, 2015 at 2:01 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark, there are two settings regarding number of cores, one is at task level :spark.task.cpus and there is another one, which drives number of cores per executors: spark.executor.cores Apart from using more than one core for a task which has to call some other external API etc, is there any other use case / benefit of assigning more than one core to a task? As per the code, I can only see this being used while scheduling etc , as such RDD partitions etc remains untouched from this setting. Does this mean that coder needs to take care of coding the application logic to take care of this setting? ( which again let me think over this setting ). Comments please. Thanks, Twinkle
Re: Spark Data Formats ?
Spark SQL (which also can give you an RDD for use with the standard Spark RDD API) has support for json, parquet, and hive tables http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources. There is also a library for Avro https://github.com/databricks/spark-avro. On Tue, Apr 14, 2015 at 2:49 AM, Akhil Das ak...@sigmoidanalytics.com wrote: There's sc.objectFile also. Thanks Best Regards On Tue, Apr 14, 2015 at 2:59 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can you please share the native support of data formats available with Spark. Two i can see are parquet and textFile sc.parquetFile sc.textFile I see that Hadoop Input Formats (Avro) are having issues, that i faced in earlier threads and seems to be well known. https://issues.apache.org/jira/browse/SPARK-993 https://issues.apache.org/jira/browse/SPARK-1018 Hence i want to know which data formats have full support in Spark. I can think of moving to those data formats. -- Deepak
Re: Spark SQL reading parquet decimal
Can you open a JIRA? On Tue, Apr 14, 2015 at 1:56 AM, Clint McNeil cl...@impactradius.com wrote: Hi guys I have parquet data written by Impala: Server version: impalad version 2.1.2-cdh5 RELEASE (build 36aad29cee85794ecc5225093c30b1e06ffb68d3) When using Spark SQL 1.3.0 (spark-assembly-1.3.0-hadoop2.4.0) i get the following error: val correlatedEventData = sqlCtx.sql( s |SELECT |id, |action_id, |adv_saleamount |FROM ir_correlated_event_t |.stripMargin) correlatedEventData.take(10).foreach(println) println(correlatedEventData count: + correlatedEventData.count) println(Decimal value: + correlatedEventData.first().getDecimal(2)) Exception in thread main java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to java.math.BigDecimal Neither does it work when I use, getFloat(2), getDouble(2), getAs[Float](2) or get(2).asInstanceOf[Float] etc etc Any assistance will be appreciated. Regards Clint
Re: How to access postgresql on Spark SQL
There is an example here: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases On Mon, Apr 13, 2015 at 6:07 PM, doovs...@sina.com wrote: Hi all, Who know how to access postgresql on Spark SQL? Do I need add the postgresql dependency in build.sbt and set class path for it? Thanks. Regards Yi
Re: Increase partitions reading Parquet File
RDDs are immutable. Running .repartition does not change the RDD, but instead returns *a new RDD *with more partitions. On Tue, Apr 14, 2015 at 3:59 AM, Masf masfwo...@gmail.com wrote: Hi. It doesn't work. val file = SqlContext.parquetfile(hdfs://node1/user/hive/warehouse/ file.parquet) file.repartition(127) println(h.partitions.size.toString()) -- Return 27! Regards On Fri, Apr 10, 2015 at 4:50 PM, Felix C felixcheun...@hotmail.com wrote: RDD.repartition(1000)? --- Original Message --- From: Masf masfwo...@gmail.com Sent: April 9, 2015 11:45 PM To: user@spark.apache.org Subject: Increase partitions reading Parquet File Hi I have this statement: val file = SqlContext.parquetfile(hdfs://node1/user/hive/warehouse/file.parquet) This code generates as many partitions as files are. So, I want to increase the number of partitions. I've tested coalesce (file.coalesce(100)) but the number of partitions doesn't change. How can I increase the number of partitions? Thanks -- Regards. Miguel Ángel -- Saludos. Miguel Ángel
TaskResultLost
Running on Spark 1.1.1 Hadoop 2.4 with Yarn AWS dedicated cluster (non-EMR) Is this in our code or config? I’ve never run into a TaskResultLost, not sure what can cause that. TaskResultLost (result lost from block manager) nivea.m https://gd-a.slack.com/team/nivea.m[11:01 AM] collect at AtA.scala:12197/213 (25 failed) nivea.m https://gd-a.slack.com/team/nivea.m[11:01 AM] org.apache.spark.rdd.RDD.collect(RDD.scala:774) org.apache.mahout.sparkbindings.blas.AtA$.at_a_slim(AtA.scala:121) org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:50) org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:231) org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:242) org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:108) org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:40) org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:90) org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:129) org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:127) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) scala.collection.TraversableOnce$class.to http://class.to/(TraversableOnce.scala:273) scala.collection.AbstractIterator.to http://scala.collection.abstractiterator.to/(Iterator.scala:1157) scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257) scala.collection.AbstractIterator.toList(Iterator.scala:1157) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Ø Also, it can be a problem when reusing the same sparkcontext for many runs. That is what happen to me. We use spark jobserver and use one sparkcontext for all jobs. The SPARK_LOCAL_DIRS is not cleaned up and is eating disk space quickly. Ningjun From: Marius Soutier [mailto:mps@gmail.com] Sent: Tuesday, April 14, 2015 12:27 PM To: Guillaume Pitel Cc: user@spark.apache.org Subject: Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up? That's true, spill dirs don't get cleaned up when something goes wrong. We are are restarting long running jobs once in a while for cleanups and have spark.cleaner.ttl set to a lower value than the default. On 14.04.2015, at 17:57, Guillaume Pitel guillaume.pi...@exensa.commailto:guillaume.pi...@exensa.com wrote: Right, I remember now, the only problematic case is when things go bad and the cleaner is not executed. Also, it can be a problem when reusing the same sparkcontext for many runs. Guillaume It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned automatically. From the source code comments: // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.commailto:guillaume.pi...@exensa.com wrote: Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, April 02, 2015 12:14 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk space in this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- exensa_logo_mail.png Guillaume PITEL, Président +33(0)626 222 431 eXenSa S.A.S.http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- exensa_logo_mail.png Guillaume PITEL, Président +33(0)626 222 431 eXenSa S.A.S.http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
spark streaming printing no output
Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
park-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class
With Spark 1.3 xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 Config is CDH 5.3.0 (Hadoop 2.3) with Kerberos 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor node1078.svc.devpg.pdx.wd: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, node1080.svc.devpg.pdx.wd): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) at com.workday.mrcodec.CryptoAESHelper.setupCrypto(CryptoAESHelper.java:61) at com.workday.mrcodec.CryptoAESHelper.init(CryptoAESHelper.java:48) at com.workday.mrcodec.CryptoAESCompressor.init(CryptoAESCompressor.java:48) at com.workday.mrcodec.CryptoAESCodec.createCompressor(CryptoAESCodec.java:52) at com.workday.mrcodec.CryptoAESCodec.createOutputStream(CryptoAESCodec.java:148) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/hadoop/disk7/yarn/local/usercache/dev.baseline/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: How DataFrame schema migration works ?
I forgot to mention that the imageId field is a custom scala object. Do I need to implement some special method to make it works (equal, hashCode ) ? On Tue, Apr 14, 2015 at 5:00 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, In the latest version of spark there's a feature called : automatic partition discovery and Schema migration for parquet. As far as I know, this gives the ability to split the DataFrame into several parquet files, and by just loading the parent directory one can get the global schema of the parent DataFrame. I'm trying to use this feature in the following problem but I get some troubles. I want to perfom a serie of feature of extraction for a set of images. At a first step, my DataFrame has just two columns : imageId, imageRawData. Then I transform the imageRowData column with different image feature extractors. The result can be of different types. For example on feature could be a mllib.Vector, and another one could be an Array[Byte]. Each feature extractor store its output as a parquet file with two columns : imageId, featureType. Then, at the end, I get the following files : - features/rawData.parquet - features/feature1.parquet - features/feature2.parquet When I load all the features with : sqlContext.load(features) It seems to works and I get with this example a DataFrame with 4 columns : imageId, imageRawData, feature1, feature2. But, when I try to read the values, for example with show, some columns have null fields and I just can't figure out what's going wrong. Any ideas ? Best, Jao
Re: Spark: Using node-local files within functions?
Hi Tobias, It should be possible to get an InputStream from an HDFS file. However, if your libraries only work directly on files, then maybe that wouldn't work? If that's the case and different tasks need different files, your way is probably the best way. If all tasks need the same file, a better option would be to pass the file in with the --files option when you spark-submit, which will cache the file between executors on the same node. -Sandy On Tue, Apr 14, 2015 at 1:39 AM, Horsmann, Tobias tobias.horsm...@uni-due.de wrote: Hi, I am trying to use Spark in combination with Yarn with 3rd party code which is unaware of distributed file systems. Providing hdfs file references thus does not work. My idea to resolve this issue was the following: Within a function I take the HDFS file reference I get as parameter and copy it into the local file system and provide the 3rd party components what they expect. textFolder.map(new Function() { public List... call(String inputFile) throws Exception { //resolve, copy hdfs file to local file system //get local file pointer //this function should be executed on a node, right. There is probably a local file system) //call 3rd party library with 'local file' reference // do other stuff } } This seem to work, but I am not really sure if this might cause other problems when going to productive file sizes. E.g. the files I copy to the local file system might be large. Would this affect Yarn somehow? Are there more advisable ways to befriend HDFS-unaware libraries with HDFS file pointer? Regards,
How DataFrame schema migration works ?
Dear all, In the latest version of spark there's a feature called : automatic partition discovery and Schema migration for parquet. As far as I know, this gives the ability to split the DataFrame into several parquet files, and by just loading the parent directory one can get the global schema of the parent DataFrame. I'm trying to use this feature in the following problem but I get some troubles. I want to perfom a serie of feature of extraction for a set of images. At a first step, my DataFrame has just two columns : imageId, imageRawData. Then I transform the imageRowData column with different image feature extractors. The result can be of different types. For example on feature could be a mllib.Vector, and another one could be an Array[Byte]. Each feature extractor store its output as a parquet file with two columns : imageId, featureType. Then, at the end, I get the following files : - features/rawData.parquet - features/feature1.parquet - features/feature2.parquet When I load all the features with : sqlContext.load(features) It seems to works and I get with this example a DataFrame with 4 columns : imageId, imageRawData, feature1, feature2. But, when I try to read the values, for example with show, some columns have null fields and I just can't figure out what's going wrong. Any ideas ? Best, Jao
Saving RDDs as custom output format
Hi, Is it possible to store RDDs as custom output formats, For example ORC? Thanks, Daniel
How to join RDD keyValuePairs efficiently
I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(/temp/allDocs.obj) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (/temp/allDocs.obj) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in /temp/allDocs.obj sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Right, I remember now, the only problematic case is when things go bad and the cleaner is not executed. Also, it can be a problem when reusing the same sparkcontext for many runs. Guillaume It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned automatically. From the source code comments: // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun *From:*Wang, Ningjun (LNG-NPV) *Sent:*Thursday, April 02, 2015 12:14 PM *To:*user@spark.apache.org mailto:user@spark.apache.org *Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk spacein this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- exensa_logo_mail.png *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Fundamentally, stream processing systems are designed for processing streams of data, not for storing large volumes of data for a long period of time. So if you have to maintain that much state for months, then its best to use another system that is designed for long term storage (like Cassandra) which has proper support for making all that state fault-tolerant, high-performant, etc. So yes, the best option is to use Cassandra for the state and Spark Streaming jobs accessing the state from Cassandra. There are a number of optimizations that can be done. Its not too hard to build a simple on-demand populated cache (singleton hash map for example), that speeds up access from Cassandra, and all updates are written through the cache. This is a common use of Spark Streaming + Cassandra/HBase. Regarding the performance of updateStateByKey, we are aware of the limitations, and we will improve it soon :) TD On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even *2 months*. During that time there might be around *500 milion*of such events, and *millions of unique* users, which are keys here. *My questions are:* - Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? - Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: - using Cassandra for state - using Trident state (with Cassandra probably) - using Samza with its state management.
Re: sbt-assembly spark-streaming-kinesis-asl error
Thanks guys. This might explain why I might be having problems. Vadim ᐧ On Tue, Apr 14, 2015 at 5:27 PM, Mike Trienis mike.trie...@orcsol.com wrote: Richard, You response was very helpful and actually resolved my issue. In case others run into a similar issue, I followed the procedure: - Upgraded to spark 1.3.0 - Add all spark related libraries are provided - Include spark transitive library dependencies where my build.sbt file libraryDependencies ++= { Seq( org.apache.spark %% spark-core % 1.3.0 % provided, org.apache.spark %% spark-streaming % 1.3.0 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided, joda-time % joda-time % 2.2, org.joda % joda-convert % 1.2, com.amazonaws % aws-java-sdk % 1.8.3, com.amazonaws % amazon-kinesis-client % 1.2.0) and submitting a spark job can done via sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar Thanks again Richard! Cheers Mike. On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher rmarsc...@localytics.com wrote: Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with
Re: Registering classes with KryoSerializer
hmm, I dunno why IntelliJ is unhappy, but you can always fall back to getting a class from the String: Class.forName(scala.reflect.ClassTag$$anon$1) perhaps the class is package private or something, and the repl somehow subverts it ... On Tue, Apr 14, 2015 at 5:44 PM, Arun Lists lists.a...@gmail.com wrote: Hi Imran, Thanks for the response! However, I am still not there yet. In the Scala interpreter, I can do: scala classOf[scala.reflect.ClassTag$$anon$1] but when I try to do this in my program in IntelliJ, it indicates an error: Cannot resolve symbol ClassTag$$anon$1 Hence I am not any closer to making this work. If you have any further suggestions, they would be most welcome. arun On Tue, Apr 14, 2015 at 2:33 PM, Imran Rashid iras...@cloudera.com wrote: Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymous inner class. I'd suggest (a) figuring out whether you really want to be serializing this thing -- its possible you're serializing an RDD which keeps a ClassTag, but normally you wouldn't want to serialize your RDDs (b) you might want to bring this up w/ chill -- spark offloads most of the kryo setup for all the scala internals to chill, I'm surprised they don't handle this already. Looks like they still handle ClassManifests which are from pre-scala 2.10: https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189 (c) you can always register these classes yourself, despite the crazy names, though you'll just need to knock these out one-by-one: scala classOf[scala.reflect.ClassTag$$anon$1] res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1: scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]): Class[_]}] = class scala.reflect.ClassTag$$anon$1 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote: Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: Seeing message about receiver not being de-registered on invoking Streaming context stop
What version of Spark are you using? There was a known bug which could be causing this. It got fixed in Spark 1.3 TD On Mon, Apr 13, 2015 at 11:44 PM, Akhil Das ak...@sigmoidanalytics.com wrote: When you say done fetching documents, does it mean that you are stopping the streamingContext? (ssc.stop) or you meant completed fetching documents for a batch? If possible, you could paste your custom receiver code so that we can have a look at it. Thanks Best Regards On Tue, Apr 7, 2015 at 8:46 AM, Hari Polisetty hpoli...@icloud.com wrote: My application is running Spark in local mode and I have a Spark Streaming Listener as well as a Custom Receiver. When the receiver is done fetching all documents, it invokes “stop” on itself. I see the StreamingListener getting a callback on “onReceiverStopped” where I stop the streaming context. However, I see the following message in my logs: 2015-04-06 16:41:51,193 WARN [Thread-66] com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop - Stopped receiver 2015-04-06 16:41:51,193 ERROR [sparkDriver-akka.actor.default-dispatcher-17] org.apache.spark.Logging$class.logError - Deregistered receiver for stream 0: AlHURLEY 2015-04-06 16:41:51,202 WARN [Executor task launch worker-2] org.apache.spark.Logging$class.logWarning - Stopped executor without error 2015-04-06 16:41:51,203 WARN [StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY)) What am I missing or doing wrong?
RE: save as text file throwing null pointer error.
Hi Akhil, I am running my program standalone, I am getting null pointer exception when I running spark program locally and when I am trying to save my RDD as a text file. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, April 14, 2015 12:41 PM To: Somnath Pandeya Cc: user@spark.apache.org Subject: Re: save as text file throwing null pointer error. Where exactly is it throwing null pointer exception? Are you starting your program from another program or something? looks like you are invoking ProcessingBuilder etc. Thanks Best Regards On Thu, Apr 9, 2015 at 6:46 PM, Somnath Pandeya somnath_pand...@infosys.commailto:somnath_pand...@infosys.com wrote: JavaRDDString lineswithoutStopWords = nonEmptylines .map(new FunctionString, String() { /** * */ private static final long serialVersionUID = 1L; @Override public String call(String line) throws Exception { // TODO Auto-generated method stub return removeStopWords(line, stopwords); } }); lineswithoutStopWords.saveAsTextFile(output/testop.txt); Exception in task 0.0 in stage 1.0 (TID 1) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/09 18:44:36 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
Re: Need some guidance
Thanks, yes. I was using Int for my V and didn't get the second param in the second closure right :) On Mon, Apr 13, 2015 at 1:55 PM, Dean Wampler deanwamp...@gmail.com wrote: That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote: How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: sbt-assembly spark-streaming-kinesis-asl error
Have you tried marking only spark-streaming-kinesis-asl as not provided, and the rest as provided? Then you will not even need to add kinesis-asl.jar in the spark-submit. TD On Tue, Apr 14, 2015 at 2:27 PM, Mike Trienis mike.trie...@orcsol.com wrote: Richard, You response was very helpful and actually resolved my issue. In case others run into a similar issue, I followed the procedure: - Upgraded to spark 1.3.0 - Add all spark related libraries are provided - Include spark transitive library dependencies where my build.sbt file libraryDependencies ++= { Seq( org.apache.spark %% spark-core % 1.3.0 % provided, org.apache.spark %% spark-streaming % 1.3.0 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided, joda-time % joda-time % 2.2, org.joda % joda-convert % 1.2, com.amazonaws % aws-java-sdk % 1.8.3, com.amazonaws % amazon-kinesis-client % 1.2.0) and submitting a spark job can done via sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar Thanks again Richard! Cheers Mike. On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher rmarsc...@localytics.com wrote: Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
Re: spark streaming printing no output
Could you see something like this in the console? --- Time: 142905487 ms --- Best Regards, Shixiong(Ryan) Zhu 2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: Registering classes with KryoSerializer
Wow, it all works now! Thanks, Imran! In case someone else finds this useful, here are the additional classes that I had to register (in addition to my application specific classes): val tuple3ArrayClass = classOf[Array[Tuple3[Any, Any, Any]]] val anonClass = Class.forName(scala.reflect.ClassTag$$anon$1) val javaClassClass = classOf[java.lang.Class[Any]] arun On Tue, Apr 14, 2015 at 6:23 PM, Imran Rashid iras...@cloudera.com wrote: hmm, I dunno why IntelliJ is unhappy, but you can always fall back to getting a class from the String: Class.forName(scala.reflect.ClassTag$$anon$1) perhaps the class is package private or something, and the repl somehow subverts it ... On Tue, Apr 14, 2015 at 5:44 PM, Arun Lists lists.a...@gmail.com wrote: Hi Imran, Thanks for the response! However, I am still not there yet. In the Scala interpreter, I can do: scala classOf[scala.reflect.ClassTag$$anon$1] but when I try to do this in my program in IntelliJ, it indicates an error: Cannot resolve symbol ClassTag$$anon$1 Hence I am not any closer to making this work. If you have any further suggestions, they would be most welcome. arun On Tue, Apr 14, 2015 at 2:33 PM, Imran Rashid iras...@cloudera.com wrote: Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymous inner class. I'd suggest (a) figuring out whether you really want to be serializing this thing -- its possible you're serializing an RDD which keeps a ClassTag, but normally you wouldn't want to serialize your RDDs (b) you might want to bring this up w/ chill -- spark offloads most of the kryo setup for all the scala internals to chill, I'm surprised they don't handle this already. Looks like they still handle ClassManifests which are from pre-scala 2.10: https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189 (c) you can always register these classes yourself, despite the crazy names, though you'll just need to knock these out one-by-one: scala classOf[scala.reflect.ClassTag$$anon$1] res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1: scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]): Class[_]}] = class scala.reflect.ClassTag$$anon$1 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote: Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Thank you Tathagata, very helpful answer. Though, I would like to highlight that recent stream processing systems are trying to help users in implementing use case of holding such large (like 2 months of data) states. I would mention here Samza state management http://samza.apache.org/learn/documentation/0.9/container/state-management.html and Trident state management https://storm.apache.org/documentation/Trident-state. I'm waiting when Spark would help with that too, because generally I definitely prefer this technology:) But considering holding state in Cassandra with Spark Streaming, I understand we're not talking here about using Cassandra as input nor output (nor make use of spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector). We're talking here about querying Cassandra from map/mapPartition functions. I have one question about it: Is it possible to query Cassandra asynchronously within Spark Streaming? And while doing it, is it possible to take next batch of rows, while the previous is waiting on Cassandra I/O? I think (but I'm not sure) this generally asks, whether several consecutive windows can interleave (because they are long to process)? Let's draw it: --|query Cassandra asynchronously--- window1 --- window2 While writing it, I start to believe they can, because windows are time-triggered, not triggered when previous window has finished... But it's better to ask:) 2015-04-15 2:08 GMT+02:00 Tathagata Das t...@databricks.com: Fundamentally, stream processing systems are designed for processing streams of data, not for storing large volumes of data for a long period of time. So if you have to maintain that much state for months, then its best to use another system that is designed for long term storage (like Cassandra) which has proper support for making all that state fault-tolerant, high-performant, etc. So yes, the best option is to use Cassandra for the state and Spark Streaming jobs accessing the state from Cassandra. There are a number of optimizations that can be done. Its not too hard to build a simple on-demand populated cache (singleton hash map for example), that speeds up access from Cassandra, and all updates are written through the cache. This is a common use of Spark Streaming + Cassandra/HBase. Regarding the performance of updateStateByKey, we are aware of the limitations, and we will improve it soon :) TD On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even *2 months*. During that time there might be around *500 milion*of such events, and *millions of unique* users, which are keys here. *My questions are:* - Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? - Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: - using Cassandra for state - using Trident state (with Cassandra probably) - using Samza with its state management.
SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
Hi guys, Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply “driver” - “net.sourceforge.jtds.jdbc.Driver” to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I’ve tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I’m probably missing some class path setting here. In jdbc.DefaultSource.createRelation it looks like the call to Class.forName doesn’t specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; import scala.collection.JavaConverters._ sc.parallelize(Seq(1,2,3,4)).flatMap(_ = java.sql.DriverManager.getDrivers().asScala.map(d = s”$d | ${d.acceptsURL(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)})).collect().foreach(println) This successfully returns; 15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at Main.scala:46, took 1.495597 s org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true As a final test we tried with postgres driver and had the same problem. Any ideas? Cheers, Nathan
Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
Just an update, tried with the old JdbcRDD and that worked fine. From: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 1:57 pm To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Hi guys, Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply “driver” - “net.sourceforge.jtds.jdbc.Driver” to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I’ve tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I’m probably missing some class path setting here. In jdbc.DefaultSource.createRelation it looks like the call to Class.forName doesn’t specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; import scala.collection.JavaConverters._ sc.parallelize(Seq(1,2,3,4)).flatMap(_ = java.sql.DriverManager.getDrivers().asScala.map(d = s”$d | ${d.acceptsURL(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)})).collect().foreach(println) This successfully returns; 15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at Main.scala:46, took 1.495597 s org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true As a final test we tried with postgres driver and had the same problem. Any ideas? Cheers, Nathan
Re: Cannot saveAsParquetFile from a RDD of case class
I've changed it to import sqlContext.implicits._ but it still doesn't work. (I've updated the gist) BTW, using .toDF() do work, thanks for this information. Regards, pishen 2015-04-14 20:35 GMT+08:00 Todd Nist tsind...@gmail.com: I think docs are correct. If you follow the example from the docs and add this import shown below, I believe you will get what your looking for: // This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._ You could also simply take your rdd and do the following: logs.toDF.saveAsParquetFile(s3n://xxx/xxx) -Todd On Tue, Apr 14, 2015 at 3:50 AM, pishen tsai pishe...@gmail.com wrote: OK, it do work. Maybe it will be better to update this usage in the official Spark SQL tutorial: http://spark.apache.org/docs/latest/sql-programming-guide.html Thanks, pishen 2015-04-14 15:30 GMT+08:00 fightf...@163.com fightf...@163.com: Hi,there If you want to use the saveAsParquetFile, you may want to use val log_df = sqlContext.createDataFrame(logs) And then you can issue log_df.saveAsParquetFile (path) Best, Sun. -- fightf...@163.com *From:* pishen pishe...@gmail.com *Date:* 2015-04-14 15:18 *To:* user user@spark.apache.org *Subject:* Cannot saveAsParquetFile from a RDD of case class Hello, I tried to follow the tutorial of Spark SQL, but is not able to saveAsParquetFile from a RDD of case class. Here is my Main.scala and build.sbt https://gist.github.com/pishen/939cad3da612ec03249f At line 34, compiler said that value saveAsParquetFile is not a member of org.apache.spark.rdd.RDD[core.Log] Any suggestion on how to solve this? Thanks, pishen -- View this message in context: Cannot saveAsParquetFile from a RDD of case class http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-saveAsParquetFile-from-a-RDD-of-case-class-tp22488.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: counters in spark
Hi Robert, A lot of task metrics are already available for individual tasks. You can get these programmatically by registering a SparkListener, and you van also view them in the UI. Eg., for each task, you can see runtime, serialization time, amount of shuffle data read, etc. I'm working on also exposing the data in the UI as json. In addition, you can also use the metrics system to get a different view of the data. It has a different set of information, and also is better for a timeline view, as opposed to a task-oriented view you get through the UI. You can read about both options here: https://spark.apache.org/docs/latest/monitoring.html On Mon, Apr 13, 2015 at 12:48 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Guys, Do you have any thoughts on this ? Thanks, Robert On Sunday, April 12, 2015 5:35 PM, Grandl Robert rgra...@yahoo.com.INVALID wrote: Hi guys, I was trying to figure out some counters in Spark, related to the amount of CPU or Memory used (in some metric), used by a task/stage/job, but I could not find any. Is there any such counter available ? Thank you, Robert
Re: spark ml model info
If you are using Scala/Java or pyspark.mllib.classification.LogisticRegressionModel, you should be able to call weights and intercept to get the model coefficients. If you are using the pipeline API in Python, you can try model._java_model.weights(), we are going to add a method to get the weights directly. Btw, if you are on the master branch, you can call model.save(sc, path) to persist models on disk and LogisticRegression.load(sc, path) to load it back. -Xiangrui On Tue, Apr 14, 2015 at 1:22 PM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I am training a model using the logistic regression algorithm in ML. I was wondering if there is any API to access the weight vectors (aka the co-efficients for each feature). I need those co-efficients for real time predictions. Thanks, Jianguo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help understanding the FP-Growth algrithm
If you want to see an example that calls MLlib's FPGrowth, you can find them under the examples/ folder: Scala: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala, Java: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java If you want to see the implementation, you can check the papers and the source code: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala Best, Xiangrui On Tue, Apr 14, 2015 at 1:27 PM, Eric Tanner eric.tan...@justenough.com wrote: I am a total newbe to spark so be kind. I am looking for an example that implements the FP-Growth algorithm so I can better understand both the algorithm as well as spark. The one example I found (on spark .apache.org example) was incomplete. Thanks, Eric - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Registering classes with KryoSerializer
Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymous inner class. I'd suggest (a) figuring out whether you really want to be serializing this thing -- its possible you're serializing an RDD which keeps a ClassTag, but normally you wouldn't want to serialize your RDDs (b) you might want to bring this up w/ chill -- spark offloads most of the kryo setup for all the scala internals to chill, I'm surprised they don't handle this already. Looks like they still handle ClassManifests which are from pre-scala 2.10: https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189 (c) you can always register these classes yourself, despite the crazy names, though you'll just need to knock these out one-by-one: scala classOf[scala.reflect.ClassTag$$anon$1] res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1: scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]): Class[_]}] = class scala.reflect.ClassTag$$anon$1 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote: Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer
HI Shuai, I don't think this is a bug with kryo, its just a subtlety with the kryo works. I *think* that it would also work if you changed your PropertiesUtil class to either (a) remove the no-arg constructor or (b) instead of extending properties, you make it a contained member variable. I wish I had a succinct explanation, but I think it really gets into the nitty gritty details of how these serializer works (and this just a hunch of mine anyway, I'm not 100% sure). Would be great if you could confirm either way. thanks, Imran On Tue, Apr 7, 2015 at 9:29 AM, Shuai Zheng szheng.c...@gmail.com wrote: I have found the issue, but I think it is bug. If I change my class to: public class ModelSessionBuilder implements Serializable { /** * */ … private *Properties[] propertiesList*; private static final long serialVersionUID = -8139500301736028670L; } The broadcast value has no issue. But in my original form, if I broadcast it as array of my custom subclass of Properties, after broadcast, the propertiesList array will be an array of empty PropertiesUtils objects there (empty, not NULL), I am not sure why this happen (the code without any problem when run with default java serializer). So I think this is a bug, but I am not sure it is a bug of spark or a bug of Kryo. Regards, Shuai *From:* Shuai Zheng [mailto:szheng.c...@gmail.com] *Sent:* Monday, April 06, 2015 5:34 PM *To:* user@spark.apache.org *Subject:* Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer Hi All, I have tested my code without problem on EMR yarn (spark 1.3.0) with default serializer (java). But when I switch to org.apache.spark.serializer.KryoSerializer, the broadcast value doesn’t give me right result (actually return me empty custom class on inner object). Basically I broadcast a builder object, which carry an array of propertiesUtils object. The code should not have any logical issue because it works on default java serializer. But when I turn to the org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn’t initialize, propertiesList will give a right size, but then all element in the array is just a normal empty PropertiesUtils. Do I miss anything when I use this KryoSerializer? I just put the two lines, do I need to implement some special code to enable KryoSerializer, but I search all places but can’t find any places mention it. sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.registerKryoClasses(*new* Class[]{ModelSessionBuilder.*class*, Constants.*class*, PropertiesUtils.*class*, ModelSession.*class*}); public class ModelSessionBuilder implements Serializable { /** * */ … private PropertiesUtils[] propertiesList; private static final long serialVersionUID = -8139500301736028670L; } *public* *class* PropertiesUtils *extends* Properties { /** * */ *private* *static* *final* *long* *serialVersionUID* = -3684043338580885551L; *public* PropertiesUtils(Properties prop) { *super*(prop); } *public* PropertiesUtils() { // *TODO* Auto-generated constructor stub } } Regards, Shuai
Catching executor exception from executor in driver
Hello, I would like to know if there is a way of catching exception throw from executor exception from the driver program. Here is an example: try { val x = sc.parallelize(Seq(1,2,3)).map(e = e / 0).collect } catch { case e: SparkException = { println(sERROR: $e) println(sCAUSE: ${e.getCause}) } } Output: ERROR: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 15, pio1.c.ace-lotus-714.internal): java.lang.ArithmeticException: / by zero at $line71.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(console:51) ... CAUSE: null The exception cause is a null value. Is there any way that I can catch the ArithmeticException? Thanks Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Catching-executor-exception-from-executor-in-driver-tp22495.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Converting Date pattern in scala code
If you're doing in Scala per se - then you can probably just reference JodaTime or Java Date / Time classes. If are using SparkSQL, then you can use the various Hive date functions for conversion. On Tue, Apr 14, 2015 at 11:04 AM BASAK, ANANDA ab9...@att.com wrote: I need some help to convert the date pattern in my Scala code for Spark 1.3. I am reading the dates from two flat files having two different date formats. File 1: 2015-03-27 File 2: 02-OCT-12 09-MAR-13 This format of file 2 is not being recognized by my Spark SQL when I am comparing it in a WHERE clause on the date fields. Format of file 1 is being recognized better. How to convert the format in file 2 to match with the format in file 1? Regards Ananda - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Help understanding the FP-Growth algrithm
I am a total newbe to spark so be kind. I am looking for an example that implements the FP-Growth algorithm so I can better understand both the algorithm as well as spark. The one example I found (on spark .apache.org example) was incomplete. Thanks, Eric
Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even *2 months*. During that time there might be around *500 milion*of such events, and *millions of unique* users, which are keys here. *My questions are:* - Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? - Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: - using Cassandra for state - using Trident state (with Cassandra probably) - using Samza with its state management.
Re: Array[T].distinct doesn't work inside RDD
Interesting, my gut instinct is the same as Sean's. I'd suggest debugging this in plain old scala first, without involving spark. Even just in the scala shell, create one of your Array[T], try calling .toSet and calling .distinct. If those aren't the same, then its got nothing to do with spark. If its still different even after you make hashCode() consistent w/ equals(), then you might have more luck asking on the scala-user list: https://groups.google.com/forum/#!forum/scala-user If it works fine in plain scala, but not in spark, then it would be worth bringing up here again for us to look into. On Tue, Apr 7, 2015 at 4:41 PM, Anny Chen anny9...@gmail.com wrote: Hi Sean, I didn't override hasCode. But the problem is that Array[T].toSet could work but Array[T].distinct couldn't. If it is because I didn't override hasCode, then toSet shouldn't work either right? I also tried using this Array[T].distinct outside RDD, and it is working alright also, returning me the same result as Array[T].toSet. Thanks! Anny On Tue, Apr 7, 2015 at 2:31 PM, Sean Owen so...@cloudera.com wrote: Did you override hashCode too? On Apr 7, 2015 2:39 PM, anny9699 anny9...@gmail.com wrote: Hi, I have a question about Array[T].distinct on customized class T. My data is a like RDD[(String, Array[T])] in which T is a class written by my class. There are some duplicates in each Array[T] so I want to remove them. I override the equals() method in T and use val dataNoDuplicates = dataDuplicates.map{case(id, arr) = (id, arr.distinct)} to remove duplicates inside RDD. However this doesn't work since I did some further tests by using val dataNoDuplicates = dataDuplicates.map{case(id, arr) = val uniqArr = arr.distinct if(uniqArr.length 1) println(uniqArr.head == uniqArr.last) (id, uniqArr) } And from the worker stdout I could see that it always returns TRUE results. I then tried removing duplicates by using Array[T].toSet instead of Array[T].distinct and it is working! Could anybody explain why the Array[T].toSet and Array[T].distinct behaves differently here? And Why is Array[T].distinct not working? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Array-T-distinct-doesn-t-work-inside-RDD-tp22412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
Shuffle write could be a good indication of skew, but it looks like the task in question hasn't generated any shuffle write yet, because its still working on the shuffle-read side. So I wouldn't read too much into the fact that the shuffle write is 0 for a task that is still running. The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or more importantly, 55M records vs 1M records). So it might not be that the raw data volume is much higher on that task, but its getting a ton more small records, which will also generate a lot of work. It also is a little more evidence to Jonathan's suggestion that there is a null / 0 record that is getting grouped together. On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com wrote: I'm not 100% sure of spark's implementation but in the MR frameworks, it would have a much larger shuffle write size becasue that node is dealing with a lot more data and as a result has a lot more to shuffle 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com: If it is really due to data skew, will the task hanging has much bigger Shuffle Write Size in this case? In this case, the shuffle write size for that task is 0, and the rest IO of this task is not much larger than the fast finished tasks, is that normal? I am also interested in this case, as from statistics on the UI, how it indicates the task could have skew data? Yong -- Date: Mon, 13 Apr 2015 12:58:12 -0400 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete From: jcove...@gmail.com To: deepuj...@gmail.com CC: user@spark.apache.org I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
spark ml model info
Hi, I am training a model using the logistic regression algorithm in ML. I was wondering if there is any API to access the weight vectors (aka the co-efficients for each feature). I need those co-efficients for real time predictions. Thanks, Jianguo
Re: Registering classes with KryoSerializer
Hi Imran, Thanks for the response! However, I am still not there yet. In the Scala interpreter, I can do: scala classOf[scala.reflect.ClassTag$$anon$1] but when I try to do this in my program in IntelliJ, it indicates an error: Cannot resolve symbol ClassTag$$anon$1 Hence I am not any closer to making this work. If you have any further suggestions, they would be most welcome. arun On Tue, Apr 14, 2015 at 2:33 PM, Imran Rashid iras...@cloudera.com wrote: Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymous inner class. I'd suggest (a) figuring out whether you really want to be serializing this thing -- its possible you're serializing an RDD which keeps a ClassTag, but normally you wouldn't want to serialize your RDDs (b) you might want to bring this up w/ chill -- spark offloads most of the kryo setup for all the scala internals to chill, I'm surprised they don't handle this already. Looks like they still handle ClassManifests which are from pre-scala 2.10: https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189 (c) you can always register these classes yourself, despite the crazy names, though you'll just need to knock these out one-by-one: scala classOf[scala.reflect.ClassTag$$anon$1] res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1: scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]): Class[_]}] = class scala.reflect.ClassTag$$anon$1 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote: Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: Catching executor exception from executor in driver
(+dev) Hi Justin, short answer: no, there is no way to do that. I'm just guessing here, but I imagine this was done to eliminate serialization problems (eg., what if we got an error trying to serialize the user exception to send from the executors back to the driver?). Though, actually that isn't a great explanation either, since even when the info gets back to the driver, its broken into a few string fields (eg., we have the class name of the root exception), but eventually it just gets converted to one big string. I've cc'ed dev b/c I think this is an oversight in Spark. It makes it really hard to write an app to deal gracefully with various exceptions -- all you can do is look at the string in SparkException (which could change arbitrarily between versions, in addition to just being a pain to work with). We should probably add much more fine-grained subclasses of SparkException, at the very least distinguishing errors in user code vs. errors in spark. I could imagine there might be a few other cases we'd like to distinguish more carefully as well. Any thoughts from other devs? thanks Imran On Tue, Apr 14, 2015 at 4:46 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like to know if there is a way of catching exception throw from executor exception from the driver program. Here is an example: try { val x = sc.parallelize(Seq(1,2,3)).map(e = e / 0).collect } catch { case e: SparkException = { println(sERROR: $e) println(sCAUSE: ${e.getCause}) } } Output: ERROR: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 15, pio1.c.ace-lotus-714.internal): java.lang.ArithmeticException: / by zero at $line71.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(console:51) ... CAUSE: null The exception cause is a null value. Is there any way that I can catch the ArithmeticException? Thanks Justin -- View this message in context: Catching executor exception from executor in driver http://apache-spark-user-list.1001560.n3.nabble.com/Catching-executor-exception-from-executor-in-driver-tp22495.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Cannot saveAsParquetFile from a RDD of case class
I think docs are correct. If you follow the example from the docs and add this import shown below, I believe you will get what your looking for: // This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._ You could also simply take your rdd and do the following: logs.toDF.saveAsParquetFile(s3n://xxx/xxx) -Todd On Tue, Apr 14, 2015 at 3:50 AM, pishen tsai pishe...@gmail.com wrote: OK, it do work. Maybe it will be better to update this usage in the official Spark SQL tutorial: http://spark.apache.org/docs/latest/sql-programming-guide.html Thanks, pishen 2015-04-14 15:30 GMT+08:00 fightf...@163.com fightf...@163.com: Hi,there If you want to use the saveAsParquetFile, you may want to use val log_df = sqlContext.createDataFrame(logs) And then you can issue log_df.saveAsParquetFile (path) Best, Sun. -- fightf...@163.com *From:* pishen pishe...@gmail.com *Date:* 2015-04-14 15:18 *To:* user user@spark.apache.org *Subject:* Cannot saveAsParquetFile from a RDD of case class Hello, I tried to follow the tutorial of Spark SQL, but is not able to saveAsParquetFile from a RDD of case class. Here is my Main.scala and build.sbt https://gist.github.com/pishen/939cad3da612ec03249f At line 34, compiler said that value saveAsParquetFile is not a member of org.apache.spark.rdd.RDD[core.Log] Any suggestion on how to solve this? Thanks, pishen -- View this message in context: Cannot saveAsParquetFile from a RDD of case class http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-saveAsParquetFile-from-a-RDD-of-case-class-tp22488.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: sbt-assembly spark-streaming-kinesis-asl error
Richard, You response was very helpful and actually resolved my issue. In case others run into a similar issue, I followed the procedure: - Upgraded to spark 1.3.0 - Add all spark related libraries are provided - Include spark transitive library dependencies where my build.sbt file libraryDependencies ++= { Seq( org.apache.spark %% spark-core % 1.3.0 % provided, org.apache.spark %% spark-streaming % 1.3.0 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided, joda-time % joda-time % 2.2, org.joda % joda-convert % 1.2, com.amazonaws % aws-java-sdk % 1.8.3, com.amazonaws % amazon-kinesis-client % 1.2.0) and submitting a spark job can done via sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar Thanks again Richard! Cheers Mike. On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher rmarsc...@localytics.com wrote: Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging
Re: Cannot saveAsParquetFile from a RDD of case class
More info on why toDF is required: http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13 On Tue, Apr 14, 2015 at 6:55 AM, pishen tsai pishe...@gmail.com wrote: I've changed it to import sqlContext.implicits._ but it still doesn't work. (I've updated the gist) BTW, using .toDF() do work, thanks for this information. Regards, pishen 2015-04-14 20:35 GMT+08:00 Todd Nist tsind...@gmail.com: I think docs are correct. If you follow the example from the docs and add this import shown below, I believe you will get what your looking for: // This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._ You could also simply take your rdd and do the following: logs.toDF.saveAsParquetFile(s3n://xxx/xxx) -Todd On Tue, Apr 14, 2015 at 3:50 AM, pishen tsai pishe...@gmail.com wrote: OK, it do work. Maybe it will be better to update this usage in the official Spark SQL tutorial: http://spark.apache.org/docs/latest/sql-programming-guide.html Thanks, pishen 2015-04-14 15:30 GMT+08:00 fightf...@163.com fightf...@163.com: Hi,there If you want to use the saveAsParquetFile, you may want to use val log_df = sqlContext.createDataFrame(logs) And then you can issue log_df.saveAsParquetFile (path) Best, Sun. -- fightf...@163.com *From:* pishen pishe...@gmail.com *Date:* 2015-04-14 15:18 *To:* user user@spark.apache.org *Subject:* Cannot saveAsParquetFile from a RDD of case class Hello, I tried to follow the tutorial of Spark SQL, but is not able to saveAsParquetFile from a RDD of case class. Here is my Main.scala and build.sbt https://gist.github.com/pishen/939cad3da612ec03249f At line 34, compiler said that value saveAsParquetFile is not a member of org.apache.spark.rdd.RDD[core.Log] Any suggestion on how to solve this? Thanks, pishen -- View this message in context: Cannot saveAsParquetFile from a RDD of case class http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-saveAsParquetFile-from-a-RDD-of-case-class-tp22488.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.