Re: PySpark: slicing issue with dataframes
Yes, it's a bug, please file a JIRA. On Sun, May 3, 2015 at 10:36 AM, Ali Bajwa ali.ba...@gmail.com wrote: Friendly reminder on this one. Just wanted to get a confirmation that this is not by design before I logged a JIRA Thanks! Ali On Tue, Apr 28, 2015 at 9:53 AM, Ali Bajwa ali.ba...@gmail.com wrote: Hi experts, Trying to use the slicing functionality in strings as part of a Spark program (PySpark) I get this error: Code import pandas as pd from pyspark.sql import SQLContext hc = SQLContext(sc) A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname': ['Jones', 'Bajwa', 'Day']}) a = hc.createDataFrame(A) print A b = a.select(a.Firstname[:2]) print b.toPandas() c = a.select(a.Lastname[2:]) print c.toPandas() Output: Firstname Lastname 0 JamesJones 1 AliBajwa 2Daniel Day SUBSTR(Firstname, 0, 2) 0 Ja 1 Al 2 Da --- Py4JError Traceback (most recent call last) ipython-input-17-6ee5d7d069ce in module() 10 b = a.select(a.Firstname[:2]) 11 print b.toPandas() --- 12 c = a.select(a.Lastname[2:]) 13 print c.toPandas() /home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self, startPos, length) 1089 raise TypeError(Can not mix the type) 1090 if isinstance(startPos, (int, long)): - 1091 jc = self._jc.substr(startPos, length) 1092 elif isinstance(startPos, Column): 1093 jc = self._jc.substr(startPos._jc, length._jc) /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 302 raise Py4JError( 303 'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'. -- 304 format(target_id, '.', name, value)) 305 else: 306 raise Py4JError( Py4JError: An error occurred while calling o1887.substr. Trace: py4j.Py4JException: Method substr([class java.lang.Integer, class java.lang.Long]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Looks like X[:2] works but X[2:] fails with the error above Anyone else have this issue? Clearly I can use substr() to workaround this, but if this is a confirmed bug we should open a JIRA. Thanks, Ali - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to set random seed
The python workers used for each stage may be different, this may not work as expected. You can create a Random object, set the seed, use it to do the shuffle(). r = random.Random() r.seek(my_seed) def f(x): r.shuffle(l) rdd.map(f) On Thu, May 14, 2015 at 6:21 AM, Charles Hayden charles.hay...@atigeo.com wrote: Thanks for the reply. I have not tried it out (I will today and report on my results) but I think what I need to do is to call mapPartitions and pass it a function that sets the seed. I was planning to pass the seed value in the closure. Something like: my_seed = 42 def f(iterator): random.seed(my_seed) yield my_seed rdd.mapPartitions(f) From: ayan guha guha.a...@gmail.com Sent: Thursday, May 14, 2015 2:29 AM To: Charles Hayden Cc: user Subject: Re: how to set random seed Sorry for late reply. Here is what I was thinking import random as r def main(): get SparkContext #Just for fun, lets assume seed is an id filename=bin.dat seed = id(filename) #broadcast it br = sc.broadcast(seed) #set up dummy list lst = [] for i in range(4): x=[] for j in range(4): x.append(j) lst.append(x) print lst base = sc.parallelize(lst) print base.map(randomize).collect() Randomize looks like def randomize(lst): local_seed = br.value r.seed(local_seed) r.shuffle(lst) return lst Let me know if this helps... base = sc.parallelize(lst) print base.map(randomize).collect() On Wed, May 13, 2015 at 11:41 PM, Charles Hayden charles.hay...@atigeo.com wrote: Can you elaborate? Broadcast will distribute the seed, which is only one number. But what construct do I use to plant the seed (call random.seed()) once on each worker? From: ayan guha guha.a...@gmail.com Sent: Tuesday, May 12, 2015 11:17 PM To: Charles Hayden Cc: user Subject: Re: how to set random seed Easiest way is to broadcast it. On 13 May 2015 10:40, Charles Hayden charles.hay...@atigeo.com wrote: In pySpark, I am writing a map with a lambda that calls random.shuffle. For testing, I want to be able to give it a seed, so that successive runs will produce the same shuffle. I am looking for a way to set this same random seed once on each worker. Is there any simple way to do it? -- Best Regards, Ayan Guha - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: number of executors
Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Effecient way to fetch all records on a particular node/partition in GraphX
Hi All, I have distributed my RDD into say 10 nodes. I want to fetch the data that resides on a particular node say node 5. How i can achieve this? I have tried mapPartitionWithIndex function to filter the data of that corresponding node, however it is pretty expensive. Any efficient way to do that ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Effecient-way-to-fetch-all-records-on-a-particular-node-partition-in-GraphX-tp22923.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: number of executors
Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
RE: Spark Streaming and reducing latency
This is the nature of Spark Streaming as a System Architecture: 1. It is a batch processing system architecture (Spark Batch) optimized for Streaming Data 2. In terms of sources of Latency in such System Architecture, bear in mind that besides “batching”, there is also the Central “Driver” function/module, which is essentially a Central Job/Task Manager (ie running on a dedicated node, which doesn’t sit on the Path of the Messages), which even in a Streaming Data scenario, FOR EACH Streaming BATCH schedules tasks (as per the DAG for the streaming job), sends them to the workers, receives the results, then schedules and sends more tasks (as per the DAG for the job) and so on and so forth In terms of Parallel Programming Patterns/Architecture, the above is known as Data Parallel Architecture with Central Job/Task Manager. There are other alternatives for achieving lower latency and in terms of Parallel Programming Patterns they are known as Pipelines or Task Parallel Architecture – essentially every messages streams individually through an assembly line of Tasks. As the tasks can be run on multiple cores of one box or in a distributed environment. Storm for example implements this pattern or you can just put together your own solution From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Sunday, May 17, 2015 4:04 PM To: dgoldenberg Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data partitioning and node tracking in Spark-GraphX
Can you please elaborate the way to fetch the records from a particular partition (node in our case) For example, my RDD is distributed to 10 nodes and i want to fetch the data of one particular node/partition i.e. partition/node with index 5. How can i do this? I have tried mapPartitionswithIndex as well as partitions.foreach functions. However, these are expensive. Does any body know more efficient way ? Thanks in anticipation. On Thu, Apr 16, 2015 at 5:49 PM, Evo Eftimov evo.efti...@isecc.com wrote: Well you can have a two level index structure, still without any need for physical cluster node awareness Level 1 Index is the previously described partitioned [K,V] RDD – this gets you to the value (RDD element) you need on the respective cluster node Level 2 Index – it will be built and reside within the Value of each [K,V] RDD element – so after you retrieve the appropriate Element from the appropriate cluster node based on Level 1 Index, then you query the Value in the element based on Level 2 Index *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] *Sent:* Thursday, April 16, 2015 4:32 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Data partitioning and node tracking in Spark-GraphX Thanks a lot for the reply. Indeed it is useful but to be more precise i have 3D data and want to index it using octree. Thus i aim to build a two level indexing mechanism i.e. First at global level i want to partition and send the data to the nodes then at node level i again want to use octree to inded my data at local level. Could you please elaborate the solution in this context ? On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov evo.efti...@isecc.com wrote: Well you can use a [Key, Value] RDD and partition it based on hash function on the Key and even a specific number of partitions (and hence cluster nodes). This will a) index the data, b) divide it and send it to multiple nodes. Re your last requirement - in a cluster programming environment/framework your app code should not be bothered on which physical node exactly, a partition resides Regards Evo Eftimov *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] *Sent:* Thursday, April 16, 2015 4:20 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Data partitioning and node tracking in Spark-GraphX I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.* -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.* -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.*
Re: Forbidded : Error Code: 403
I think you can try this way also: DataFrame df = sqlContext.load(s3n://ACCESS-KEY:SECRET-KEY@bucket-name/file.avro, com.databricks.spark.avro); Thanks Best Regards On Sat, May 16, 2015 at 2:02 AM, Mohammad Tariq donta...@gmail.com wrote: Thanks for the suggestion Steve. I'll try that out. Read the long story last night while struggling with this :). I made sure that I don't have any '/' in my key. On Saturday, May 16, 2015, Steve Loughran ste...@hortonworks.com wrote: On 15 May 2015, at 21:20, Mohammad Tariq donta...@gmail.com wrote: Thank you Ayan and Ted for the prompt response. It isn't working with s3n either. And I am able to download the file. In fact I am able to read the same file using s3 API without any issue. sounds like an S3n config problem. Check your configurations - you can test locally via the hdfs dfs command without even starting spark Oh, and if there is a / in your secret key, you're going to to need to generate new one. Long story -- [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?
Why not just trigger your batch job with that event? If you really need streaming, then you can create a custom receiver and make the receiver sleep till the event has happened. That will obviously run your streaming pipelines without having any data to process. Thanks Best Regards On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote: In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: number of executors
bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Trying to understand sc.textFile better
All, I am trying to understand the textFile method deeply, but I think my lack of deep Hadoop knowledge is holding me back here. Let me lay out my understanding and maybe you can correct anything that is incorrect When sc.textFile(path) is called, then defaultMinPartitions is used, which is really just math.min(taskScheduler.defaultParallelism, 2). Let's assume we are using the SparkDeploySchedulerBackend and this is conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2)) So, now let's say the default is 2, going back to the textFile, this is passed in to HadoopRDD. The true size is determined in getPartitions() using inputFormat.getSplits(jobConf, minPartitions). But, from what I can find, the partitions is merely a hint and is in fact mostly ignored, so you will probably get the total number of blocks. OK, this fits with expectations, however what if the default is not used and you provide a partition size that is larger than the block size. If my research is right and the getSplits call simply ignores this parameter, then wouldn't the provided min end up being ignored and you would still just get the block size? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-understand-sc-textFile-better-tp22924.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: textFileStream Question
With file timestamp, you can actually see the finding new files logic from here https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L172 Thanks Best Regards On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: How does textFileStream work behind the scenes? How does Spark Streaming know what files are new and need to be processed? Is it based on time stamp, file name? Thanks, Vadim ᐧ
InferredSchema Example in Spark-SQL
Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
Re: InferredSchema Example in Spark-SQL
you are missing sqlContext.implicits._ On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua rajdeep@gmail.com wrote: Here are my imports *import* org.apache.spark.SparkContext *import* org.apache.spark.SparkContext._ *import* org.apache.spark.SparkConf *import* org.apache.spark.sql.SQLContext *import* org.apache.spark.sql.SchemaRDD On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua rajdeep@gmail.com wrote: Sorry .. toDF() gives an error [error] /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao hao.ch...@intel.com wrote: Typo? Should be .toDF(), not .toRD() *From:* Ram Sriharsha [mailto:sriharsha@gmail.com] *Sent:* Monday, May 18, 2015 8:31 AM *To:* Rajdeep Dua *Cc:* user *Subject:* Re: InferredSchema Example in Spark-SQL you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring schema from the case class) On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com wrote: Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
RE: InferredSchema Example in Spark-SQL
Forgot to import the implicit functions/classes? import sqlContext.implicits._ From: Rajdeep Dua [mailto:rajdeep@gmail.com] Sent: Monday, May 18, 2015 8:08 AM To: user@spark.apache.org Subject: InferredSchema Example in Spark-SQL Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
Re: InferredSchema Example in Spark-SQL
you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring schema from the case class) On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com wrote: Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?
I want to use file stream as input. And I look at SparkStreaming document again, it's saying file stream doesn't need a receiver at all. So I'm wondering if I can control a specific DStream instance. From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:39 AM To: 'Akhil Das'; Haopu Wang Cc: 'user' Subject: RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application? You can make ANY standard receiver sleep by implementing a custom Message Deserializer class with sleep method inside it. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Sunday, May 17, 2015 4:29 PM To: Haopu Wang Cc: user Subject: Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application? Why not just trigger your batch job with that event? If you really need streaming, then you can create a custom receiver and make the receiver sleep till the event has happened. That will obviously run your streaming pipelines without having any data to process. Thanks Best Regards On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote: In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency
Looks like this problem has been mentioned before: http://qnalist.com/questions/5666463/downloads-from-s3-exceedingly-slow-when-running-on-spark-ec2 and a temporarily solution is to deploy on a dedicated EMR/S3 configuration. I'll go for that one for a shot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22927.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency
Turns out the above thread is unrelated: it was caused by using s3:// instead of s3n://. Which I already avoided in my checkpointDir configuration. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22928.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: InferredSchema Example in Spark-SQL
Typo? Should be .toDF(), not .toRD() From: Ram Sriharsha [mailto:sriharsha@gmail.com] Sent: Monday, May 18, 2015 8:31 AM To: Rajdeep Dua Cc: user Subject: Re: InferredSchema Example in Spark-SQL you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring schema from the case class) On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.commailto:rajdeep@gmail.com wrote: Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
Re: InferredSchema Example in Spark-SQL
Here are my imports *import* org.apache.spark.SparkContext *import* org.apache.spark.SparkContext._ *import* org.apache.spark.SparkConf *import* org.apache.spark.sql.SQLContext *import* org.apache.spark.sql.SchemaRDD On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua rajdeep@gmail.com wrote: Sorry .. toDF() gives an error [error] /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao hao.ch...@intel.com wrote: Typo? Should be .toDF(), not .toRD() *From:* Ram Sriharsha [mailto:sriharsha@gmail.com] *Sent:* Monday, May 18, 2015 8:31 AM *To:* Rajdeep Dua *Cc:* user *Subject:* Re: InferredSchema Example in Spark-SQL you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring schema from the case class) On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com wrote: Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
Re: InferredSchema Example in Spark-SQL
Sorry .. toDF() gives an error [error] /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao hao.ch...@intel.com wrote: Typo? Should be .toDF(), not .toRD() *From:* Ram Sriharsha [mailto:sriharsha@gmail.com] *Sent:* Monday, May 18, 2015 8:31 AM *To:* Rajdeep Dua *Cc:* user *Subject:* Re: InferredSchema Example in Spark-SQL you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring schema from the case class) On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com wrote: Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency
BTW: My thread dump of the driver's main thread looks like it is stuck on waiting for Amazon S3 bucket metadata for a long time (which may suggests that I should move checkpointing directory from S3 to HDFS): Thread 1: main (RUNNABLE) java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.read(SocketInputStream.java:152) java.net.SocketInputStream.read(SocketInputStream.java:122) sun.security.ssl.InputRecord.readFully(InputRecord.java:442) sun.security.ssl.InputRecord.read(InputRecord.java:480) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934) sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891) sun.security.ssl.AppInputStream.read(AppInputStream.java:102) org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223) org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685) org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487) org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172) sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22926.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: InferredSchema Example in Spark-SQL
You mean toDF() not toRD(). It stands for data frame of that makes it easier to remember. Simon On 18 May 2015, at 01:07, Rajdeep Dua rajdeep@gmail.com wrote: Hi All, Was trying the Inferred Schema spart example http://spark.apache.org/docs/latest/sql-programming-guide.html#overview I am getting the following compilation error on the function toRD() value toRD is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toRD() [error] Thanks Rajdeep
Re: Spark Streaming and reducing latency
With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?
You can make ANY standard receiver sleep by implementing a custom Message Deserializer class with sleep method inside it. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Sunday, May 17, 2015 4:29 PM To: Haopu Wang Cc: user Subject: Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application? Why not just trigger your batch job with that event? If you really need streaming, then you can create a custom receiver and make the receiver sleep till the event has happened. That will obviously run your streaming pipelines without having any data to process. Thanks Best Regards On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote: In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Effecient way to fetch all records on a particular node/partition in GraphX
If you know the partition IDs, you can launch a job that runs tasks on only those partitions by calling sc.runJob https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1686. For example, we do this in IndexedRDD https://github.com/amplab/spark-indexedrdd/blob/f0c42dcad1f49ce36140f0c1f7d2c3ed61ed373e/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexedRDDLike.scala#L100 to get particular keys without launching a task on every partition. Ankur http://www.ankurdave.com/ On Sun, May 17, 2015 at 8:32 AM, mas mas.ha...@gmail.com wrote: I have distributed my RDD into say 10 nodes. I want to fetch the data that resides on a particular node say node 5. How i can achieve this? I have tried mapPartitionWithIndex function to filter the data of that corresponding node, however it is pretty expensive.
Re: textFileStream Question
This is cool. Thanks Akhil. ᐧ On Sun, May 17, 2015 at 11:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With file timestamp, you can actually see the finding new files logic from here https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L172 Thanks Best Regards On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: How does textFileStream work behind the scenes? How does Spark Streaming know what files are new and need to be processed? Is it based on time stamp, file name? Thanks, Vadim ᐧ
Big Data Day LA: FREE Big Data Conference in Los Angeles on June 27, 2015
Please register for the 3rd annual full day ‘Big Data Day LA’ here: - http://bigdatadayla.org • Location: Los Angeles • Date: June 27, 2015 • Completely FREE: Attendance, Food (Breakfast, Lunch Coffee Breaks) and Networking Reception • Vendor neutral • Great lineup of presentations, workshops, panels, and keynotes. • Over 45 talks organized in 5 tracks: Hadoop/Spark, Big Data, Business Use Cases, NoSQL, Data Science • Led by over 40+ volunteers • 800+ expected attendees Register before it sells out! http://bigdatadayla.org Thanks Slim Baltagi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Big-Data-Day-LA-FREE-Big-Data-Conference-in-Los-Angeles-on-June-27-2015-tp22921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming and reducing latency
I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org