Re: PySpark: slicing issue with dataframes

2015-05-17 Thread Davies Liu
Yes, it's a bug, please file a JIRA.

On Sun, May 3, 2015 at 10:36 AM, Ali Bajwa ali.ba...@gmail.com wrote:
 Friendly reminder on this one. Just wanted to get a confirmation that this
 is not by design before I logged a JIRA

 Thanks!
 Ali


 On Tue, Apr 28, 2015 at 9:53 AM, Ali Bajwa ali.ba...@gmail.com wrote:

 Hi experts,

 Trying to use the slicing functionality in strings as part of a Spark
 program (PySpark) I get this error:

  Code 

 import pandas as pd
 from pyspark.sql import SQLContext
 hc = SQLContext(sc)
 A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname':
 ['Jones', 'Bajwa', 'Day']})
 a = hc.createDataFrame(A)
 print A

 b = a.select(a.Firstname[:2])
 print b.toPandas()
 c = a.select(a.Lastname[2:])
 print c.toPandas()

 Output:

  Firstname Lastname
 0 JamesJones
 1   AliBajwa
 2Daniel  Day
   SUBSTR(Firstname, 0, 2)
 0  Ja
 1  Al
 2  Da


 ---
 Py4JError Traceback (most recent call
 last)
 ipython-input-17-6ee5d7d069ce in module()
  10 b = a.select(a.Firstname[:2])
  11 print b.toPandas()
 --- 12 c = a.select(a.Lastname[2:])
  13 print c.toPandas()

 /home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self,
 startPos, length)
1089 raise TypeError(Can not mix the type)
1090 if isinstance(startPos, (int, long)):
 - 1091 jc = self._jc.substr(startPos, length)
1092 elif isinstance(startPos, Column):
1093 jc = self._jc.substr(startPos._jc, length._jc)


 /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
 302 raise Py4JError(
 303 'An error occurred while calling {0}{1}{2}.
 Trace:\n{3}\n'.
 -- 304 format(target_id, '.', name, value))
 305 else:
 306 raise Py4JError(

 Py4JError: An error occurred while calling o1887.substr. Trace:
 py4j.Py4JException: Method substr([class java.lang.Integer, class
 java.lang.Long]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
 at py4j.Gateway.invoke(Gateway.java:252)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)

 Looks like X[:2] works but X[2:] fails with the error above
 Anyone else have this issue?

 Clearly I can use substr() to workaround this, but if this is a confirmed
 bug we should open a JIRA.

 Thanks,
 Ali



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to set random seed

2015-05-17 Thread Davies Liu
The python workers used for each stage may be different, this may not
work as expected.

You can create a Random object, set the seed, use it to do the shuffle().

r  = random.Random()
r.seek(my_seed)

def f(x):
   r.shuffle(l)
rdd.map(f)

On Thu, May 14, 2015 at 6:21 AM, Charles Hayden
charles.hay...@atigeo.com wrote:
 Thanks for the reply.


 I have not tried it out (I will today and report on my results) but I think
 what I need to do is to call mapPartitions and pass it a function that sets
 the seed.  I was planning to pass the seed value in the closure.


 Something like:

 my_seed = 42
 def f(iterator):
 random.seed(my_seed)
 yield my_seed
 rdd.mapPartitions(f)


 
 From: ayan guha guha.a...@gmail.com
 Sent: Thursday, May 14, 2015 2:29 AM

 To: Charles Hayden
 Cc: user
 Subject: Re: how to set random seed

 Sorry for late reply.

 Here is what I was thinking

 import random as r
 def main():
 get SparkContext
 #Just for fun, lets assume seed is an id
 filename=bin.dat
 seed = id(filename)
 #broadcast it
 br = sc.broadcast(seed)

 #set up dummy list
 lst = []
 for i in range(4):
 x=[]
 for j in range(4):
 x.append(j)
 lst.append(x)
 print lst
 base = sc.parallelize(lst)
 print base.map(randomize).collect()

 Randomize looks like
 def randomize(lst):
 local_seed = br.value
 r.seed(local_seed)
 r.shuffle(lst)
 return lst


 Let me know if this helps...




 base = sc.parallelize(lst)
 print base.map(randomize).collect()

 On Wed, May 13, 2015 at 11:41 PM, Charles Hayden charles.hay...@atigeo.com
 wrote:

 Can you elaborate? Broadcast will distribute the seed, which is only one
 number.  But what construct do I use to plant the seed (call
 random.seed()) once on each worker?

 
 From: ayan guha guha.a...@gmail.com
 Sent: Tuesday, May 12, 2015 11:17 PM
 To: Charles Hayden
 Cc: user
 Subject: Re: how to set random seed


 Easiest way is to broadcast it.

 On 13 May 2015 10:40, Charles Hayden charles.hay...@atigeo.com wrote:

 In pySpark, I am writing a map with a lambda that calls random.shuffle.
 For testing, I want to be able to give it a seed, so that successive runs
 will produce the same shuffle.
 I am looking for a way to set this same random seed once on each worker.
 Is there any simple way to do it?





 --
 Best Regards,
 Ayan Guha

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: number of executors

2015-05-17 Thread Akhil Das
Did you try --executor-cores param? While you submit the job, do a ps aux |
grep spark-submit and see the exact command parameters.

Thanks
Best Regards

On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.

  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
 --class scala.SimpleApp --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see only
 two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe



Effecient way to fetch all records on a particular node/partition in GraphX

2015-05-17 Thread mas
Hi All,

I have distributed my RDD into say 10 nodes. I want to fetch the data that
resides on a particular node say node 5. How i can achieve this?
I have tried mapPartitionWithIndex function to filter the data of that
corresponding node, however it is pretty expensive. 
Any efficient way to do that ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Effecient-way-to-fetch-all-records-on-a-particular-node-partition-in-GraphX-tp22923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: number of executors

2015-05-17 Thread xiaohe lan
Sorry, them both are assigned task actually.

Aggregated Metrics by Executor
Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size
/ RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill
(Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4
MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8
MB

On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB
 / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems in
 my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps aux
 | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe






RE: Spark Streaming and reducing latency

2015-05-17 Thread Evo Eftimov
This is the nature of Spark Streaming as a System Architecture:

 

1.   It is a batch processing system architecture (Spark Batch) optimized 
for Streaming Data

2.   In terms of sources of Latency in such System Architecture, bear in 
mind that besides “batching”, there is also the Central “Driver” 
function/module, which is essentially a Central Job/Task Manager (ie running on 
a dedicated node, which doesn’t sit on the Path of the Messages), which even in 
a Streaming Data scenario, FOR EACH Streaming BATCH schedules tasks (as per the 
DAG for the streaming job), sends them to the workers, receives the results, 
then schedules and sends more tasks (as per the DAG for the job) and so on and 
so forth

 

In terms of Parallel Programming Patterns/Architecture, the above is known as 
Data Parallel Architecture with Central Job/Task Manager.

 

There are other alternatives for achieving lower latency and in terms of 
Parallel Programming Patterns they are known as Pipelines or Task Parallel 
Architecture – essentially every messages streams individually through an 
assembly line of Tasks. As the tasks can be run on multiple cores of one box or 
in a distributed environment. Storm for example implements this pattern or you 
can just put together your own solution 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:04 PM
To: dgoldenberg
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level?

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: Data partitioning and node tracking in Spark-GraphX

2015-05-17 Thread MUHAMMAD AAMIR
Can you please elaborate the way to fetch the records from a particular
partition (node in our case) For example, my RDD is distributed to 10 nodes
and i want to fetch the data of one particular node/partition  i.e.
partition/node with index 5.
How can i do this?
I have tried mapPartitionswithIndex as well as partitions.foreach
functions. However, these are expensive. Does any body know more efficient
way ?

Thanks in anticipation.


On Thu, Apr 16, 2015 at 5:49 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Well you can have a two level index structure, still without any need for
 physical cluster node awareness



 Level 1 Index is the previously described partitioned [K,V] RDD – this
 gets you to the value (RDD element) you need on the respective cluster node



 Level 2 Index – it will be built and reside within the Value of each [K,V]
 RDD element – so after you retrieve the appropriate Element from the
 appropriate cluster node based on Level 1 Index, then you query the Value
 in the element based on Level 2 Index



 *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com]
 *Sent:* Thursday, April 16, 2015 4:32 PM

 *To:* Evo Eftimov
 *Cc:* user@spark.apache.org
 *Subject:* Re: Data partitioning and node tracking in Spark-GraphX



 Thanks a lot for the reply. Indeed it is useful but to be more precise i
 have 3D data and want to index it using octree. Thus i aim to build a two
 level indexing mechanism i.e. First at global level i want to partition and
 send the data to the nodes then at node level i again want to use octree to
 inded my data at local level.

 Could you please elaborate the solution in this context ?



 On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 Well you can use a [Key, Value] RDD and partition it based on hash
 function on the Key and even a specific number of partitions (and hence
 cluster nodes). This will a) index the data, b) divide it and send it to
 multiple nodes. Re your last requirement - in a cluster programming
 environment/framework your app code should not be bothered on which
 physical node exactly, a partition resides



 Regards

 Evo Eftimov



 *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com]
 *Sent:* Thursday, April 16, 2015 4:20 PM
 *To:* Evo Eftimov
 *Cc:* user@spark.apache.org
 *Subject:* Re: Data partitioning and node tracking in Spark-GraphX



 I want to use Spark functions/APIs to do this task. My basic purpose is to
 index the data and divide and send it to multiple nodes. Then at the time
 of accessing i want to reach the right node and data partition. I don't
 have any clue how to do this.

 Thanks,



 On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 How do you intend to fetch the required data - from within Spark or using
 an app / code / module outside Spark

 -Original Message-
 From: mas [mailto:mas.ha...@gmail.com]
 Sent: Thursday, April 16, 2015 4:08 PM
 To: user@spark.apache.org
 Subject: Data partitioning and node tracking in Spark-GraphX

 I have a big data file, i aim to create index on the data. I want to
 partition the data based on user defined function in Spark-GraphX (Scala).
 Further i want to keep track the node on which a particular data partition
 is send and being processed so i could fetch the required data by accessing
 the right node and data partition.
 How can i achieve this?
 Any help in this regard will be highly appreciated.



 --
 View this message in context:

 http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no
 de-tracking-in-Spark-GraphX-tp22527.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





 --

 Regards,
 Muhammad Aamir


 *CONFIDENTIALITY:This email is intended solely for the person(s) named and
 may be confidential and/or privileged.If you are not the intended
 recipient,please delete it,notify me and do not copy,use,or disclose its
 content.*





 --

 Regards,
 Muhammad Aamir


 *CONFIDENTIALITY:This email is intended solely for the person(s) named and
 may be confidential and/or privileged.If you are not the intended
 recipient,please delete it,notify me and do not copy,use,or disclose its
 content.*




-- 
Regards,
Muhammad Aamir


*CONFIDENTIALITY:This email is intended solely for the person(s) named and
may be confidential and/or privileged.If you are not the intended
recipient,please delete it,notify me and do not copy,use,or disclose its
content.*


Re: Forbidded : Error Code: 403

2015-05-17 Thread Akhil Das
I think you can try this way also:

DataFrame df = 
sqlContext.load(s3n://ACCESS-KEY:SECRET-KEY@bucket-name/file.avro,
com.databricks.spark.avro);


Thanks
Best Regards

On Sat, May 16, 2015 at 2:02 AM, Mohammad Tariq donta...@gmail.com wrote:

 Thanks for the suggestion Steve. I'll try that out.

 Read the long story last night while struggling with this :). I made sure
 that I don't have any '/' in my key.

 On Saturday, May 16, 2015, Steve Loughran ste...@hortonworks.com wrote:


  On 15 May 2015, at 21:20, Mohammad Tariq donta...@gmail.com wrote:
 
  Thank you Ayan and Ted for the prompt response. It isn't working with
 s3n either.
 
  And I am able to download the file. In fact I am able to read the same
 file using s3 API without any issue.
 


 sounds like an S3n config problem. Check your configurations - you can
 test locally via the hdfs dfs command without even starting spark

  Oh, and if there is a / in your secret key, you're going to to need to
 generate new one. Long story



 --

 [image: http://]
 Tariq, Mohammad
 about.me/mti
 [image: http://]
 http://about.me/mti





Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Akhil Das
Why not just trigger your batch job with that event?

If you really need streaming, then you can create a custom receiver and
make the receiver sleep till the event has happened. That will obviously
run your streaming pipelines without having any data to process.

Thanks
Best Regards

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote:

 In my application, I want to start a DStream computation only after an
 special event has happened (for example, I want to start the receiver
 only after the reference data has been properly initialized).

 My question is: it looks like the DStream will be started right after
 the StreaminContext has been started. Is it possible to delay the start
 of specific DStream?

 Thank you very much!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: number of executors

2015-05-17 Thread xiaohe lan
bash-4.1$ ps aux | grep SparkSubmit
xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
/scratch/xilan/jdk1.8.0_45/bin/java -cp
/scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
-Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
--num-executors 5 --executor-cores 4
xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
--color SparkSubmit


When look at the sparkui, I see the following:
Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB /
28089782host2:49970 ms00063.4 MB / 1810945

So executor 2 is not even assigned a task ? Maybe I have some problems in
my setting, but I don't know what could be the possible settings I set
wrong or have not set.


Thanks,
Xiaohe

On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try --executor-cores param? While you submit the job, do a ps aux
 | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.

  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
 --class scala.SimpleApp --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see only
 two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe





Trying to understand sc.textFile better

2015-05-17 Thread Justin Pihony
All,
I am trying to understand the textFile method deeply, but I think my
lack of deep Hadoop knowledge is holding me back here. Let me lay out my
understanding and maybe you can correct anything that is incorrect

When sc.textFile(path) is called, then defaultMinPartitions is used,
which is really just math.min(taskScheduler.defaultParallelism, 2). Let's
assume we are using the SparkDeploySchedulerBackend and this is 
conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(),
2))
So, now let's say the default is 2, going back to the textFile, this is
passed in to HadoopRDD. The true size is determined in getPartitions() using
inputFormat.getSplits(jobConf, minPartitions). But, from what I can find,
the partitions is merely a hint and is in fact mostly ignored, so you will
probably get the total number of blocks.
OK, this fits with expectations, however what if the default is not used and
you provide a partition size that is larger than the block size. If my
research is right and the getSplits call simply ignores this parameter, then
wouldn't the provided min end up being ignored and you would still just get
the block size?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-understand-sc-textFile-better-tp22924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: textFileStream Question

2015-05-17 Thread Akhil Das
With file timestamp, you can actually see the finding new files logic from
here
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L172

Thanks
Best Regards

On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 How does textFileStream work behind the scenes? How does Spark Streaming
 know what files are new and need to be processed? Is it based on time
 stamp, file name?

 Thanks,
 Vadim
 ᐧ



InferredSchema Example in Spark-SQL

2015-05-17 Thread Rajdeep Dua
Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people =
sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
= Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep


Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Ram Sriharsha
you are missing sqlContext.implicits._

On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua rajdeep@gmail.com wrote:

 Here are my imports

 *import* org.apache.spark.SparkContext

 *import* org.apache.spark.SparkContext._

 *import* org.apache.spark.SparkConf

 *import* org.apache.spark.sql.SQLContext

 *import* org.apache.spark.sql.SchemaRDD

 On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua rajdeep@gmail.com
 wrote:

 Sorry .. toDF() gives an error

 [error]
 /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24:
 value toDF is not a member of org.apache.spark.rdd.RDD[Person]
 [error] val people =
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toDF()


 On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Typo? Should be .toDF(), not .toRD()



 *From:* Ram Sriharsha [mailto:sriharsha@gmail.com]
 *Sent:* Monday, May 18, 2015 8:31 AM
 *To:* Rajdeep Dua
 *Cc:* user
 *Subject:* Re: InferredSchema Example in Spark-SQL



 you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
 inferring schema from the case class)







 On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com
 wrote:

  Hi All,

 Was trying the Inferred Schema spart example

 http://spark.apache.org/docs/latest/sql-programming-guide.html#overview



 I am getting the following compilation error on the function toRD()



 value toRD is not a member of org.apache.spark.rdd.RDD[Person]

 [error] val people =
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()

 [error]



 Thanks

 Rajdeep













RE: InferredSchema Example in Spark-SQL

2015-05-17 Thread Cheng, Hao
Forgot to import the implicit functions/classes?

import sqlContext.implicits._

From: Rajdeep Dua [mailto:rajdeep@gmail.com]
Sent: Monday, May 18, 2015 8:08 AM
To: user@spark.apache.org
Subject: InferredSchema Example in Spark-SQL

Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people = 
sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep




Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Ram Sriharsha
you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
inferring schema from the case class)



On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com wrote:

 Hi All,
 Was trying the Inferred Schema spart example
 http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

 I am getting the following compilation error on the function toRD()

 value toRD is not a member of org.apache.spark.rdd.RDD[Person]
 [error] val people =
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()
 [error]

 Thanks
 Rajdeep






RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Haopu Wang
I want to use file stream as input. And I look at SparkStreaming
document again, it's saying file stream doesn't need a receiver at all.

So I'm wondering if I can control a specific DStream instance.

 



From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:39 AM
To: 'Akhil Das'; Haopu Wang
Cc: 'user'
Subject: RE: [SparkStreaming] Is it possible to delay the start of some
DStream in the application?

 

You can make ANY standard receiver sleep by implementing a custom
Message Deserializer class with sleep method inside it. 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:29 PM
To: Haopu Wang
Cc: user
Subject: Re: [SparkStreaming] Is it possible to delay the start of some
DStream in the application?

 

Why not just trigger your batch job with that event?

 

If you really need streaming, then you can create a custom receiver and
make the receiver sleep till the event has happened. That will obviously
run your streaming pipelines without having any data to process.




Thanks

Best Regards

 

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote:

In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Looks like this problem has been mentioned before:

http://qnalist.com/questions/5666463/downloads-from-s3-exceedingly-slow-when-running-on-spark-ec2

and a temporarily solution is to deploy on a dedicated EMR/S3 configuration.
I'll go for that one for a shot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Turns out the above thread is unrelated: it was caused by using s3:// instead
of s3n://. Which I already avoided in my checkpointDir configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: InferredSchema Example in Spark-SQL

2015-05-17 Thread Cheng, Hao
Typo? Should be .toDF(), not .toRD()

From: Ram Sriharsha [mailto:sriharsha@gmail.com]
Sent: Monday, May 18, 2015 8:31 AM
To: Rajdeep Dua
Cc: user
Subject: Re: InferredSchema Example in Spark-SQL

you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring 
schema from the case class)



On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua 
rajdeep@gmail.commailto:rajdeep@gmail.com wrote:
Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people = 
sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep





Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Rajdeep Dua
Here are my imports

*import* org.apache.spark.SparkContext

*import* org.apache.spark.SparkContext._

*import* org.apache.spark.SparkConf

*import* org.apache.spark.sql.SQLContext

*import* org.apache.spark.sql.SchemaRDD

On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua rajdeep@gmail.com wrote:

 Sorry .. toDF() gives an error

 [error]
 /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24:
 value toDF is not a member of org.apache.spark.rdd.RDD[Person]
 [error] val people =
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toDF()


 On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Typo? Should be .toDF(), not .toRD()



 *From:* Ram Sriharsha [mailto:sriharsha@gmail.com]
 *Sent:* Monday, May 18, 2015 8:31 AM
 *To:* Rajdeep Dua
 *Cc:* user
 *Subject:* Re: InferredSchema Example in Spark-SQL



 you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
 inferring schema from the case class)







 On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com
 wrote:

  Hi All,

 Was trying the Inferred Schema spart example

 http://spark.apache.org/docs/latest/sql-programming-guide.html#overview



 I am getting the following compilation error on the function toRD()



 value toRD is not a member of org.apache.spark.rdd.RDD[Person]

 [error] val people =
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()

 [error]



 Thanks

 Rajdeep












Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Rajdeep Dua
Sorry .. toDF() gives an error

[error]
/home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24:
value toDF is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people =
sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
= Person(p(0), p(1).trim.toInt)).toDF()


On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Typo? Should be .toDF(), not .toRD()



 *From:* Ram Sriharsha [mailto:sriharsha@gmail.com]
 *Sent:* Monday, May 18, 2015 8:31 AM
 *To:* Rajdeep Dua
 *Cc:* user
 *Subject:* Re: InferredSchema Example in Spark-SQL



 you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
 inferring schema from the case class)







 On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua rajdeep@gmail.com
 wrote:

  Hi All,

 Was trying the Inferred Schema spart example

 http://spark.apache.org/docs/latest/sql-programming-guide.html#overview



 I am getting the following compilation error on the function toRD()



 value toRD is not a member of org.apache.spark.rdd.RDD[Person]

 [error] val people =
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()

 [error]



 Thanks

 Rajdeep










Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
BTW: My thread dump of the driver's main thread looks like it is stuck on
waiting for Amazon S3 bucket metadata for a long time (which may suggests
that I should move checkpointing directory from S3 to HDFS):

Thread 1: main (RUNNABLE) 
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
sun.security.ssl.InputRecord.read(InputRecord.java:480)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223)
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Simon Elliston Ball
You mean toDF() not toRD(). It stands for data frame of that makes it easier to 
remember.

Simon

 On 18 May 2015, at 01:07, Rajdeep Dua rajdeep@gmail.com wrote:
 
 Hi All,
 Was trying the Inferred Schema spart example
 http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
 
 I am getting the following compilation error on the function toRD()
 
 value toRD is not a member of org.apache.spark.rdd.RDD[Person]
 [error] val people = 
 sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
  = Person(p(0), p(1).trim.toInt)).toRD()
 [error]  
 
 Thanks
 Rajdeep   
 
 
 


Re: Spark Streaming and reducing latency

2015-05-17 Thread Akhil Das
With receiver based streaming, you can actually
specify spark.streaming.blockInterval which is the interval at which the
receiver will fetch data from the source. Default value is 200ms and hence
if your batch duration is 1 second, it will produce 5 blocks of data. And
yes, with sparkstreaming when your processing time goes beyond your batch
duration and you are having a higher data consumption then you will
overwhelm the receiver's memory and hence will throw up block not found
exceptions.

Thanks
Best Regards

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 I keep hearing the argument that the way Discretized Streams work with
 Spark
 Streaming is a lot more of a batch processing algorithm than true
 streaming.
 For streaming, one would expect a new item, e.g. in a Kafka topic, to be
 available to the streaming consumer immediately.

 With the discretized streams, streaming is done with batch intervals i.e.
 the consumer has to wait the interval to be able to get at the new items.
 If
 one wants to reduce latency it seems the only way to do this would be by
 reducing the batch interval window. However, that may lead to a great deal
 of churn, with many requests going into Kafka out of the consumers,
 potentially with no results whatsoever as there's nothing new in the topic
 at the moment.

 Is there a counter-argument to this reasoning? What are some of the general
 approaches to reduce latency  folks might recommend? Or, perhaps there are
 ways of dealing with this at the streaming API level?

 If latency is of great concern, is it better to look into streaming from
 something like Flume where data is pushed to consumers rather than pulled
 by
 them? Are there techniques, in that case, to ensure the consumers don't get
 overwhelmed with new data?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Evo Eftimov
You can make ANY standard receiver sleep by implementing a custom Message 
Deserializer class with sleep method inside it. 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:29 PM
To: Haopu Wang
Cc: user
Subject: Re: [SparkStreaming] Is it possible to delay the start of some DStream 
in the application?

 

Why not just trigger your batch job with that event?

 

If you really need streaming, then you can create a custom receiver and make 
the receiver sleep till the event has happened. That will obviously run your 
streaming pipelines without having any data to process.




Thanks

Best Regards

 

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote:

In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: Effecient way to fetch all records on a particular node/partition in GraphX

2015-05-17 Thread Ankur Dave
If you know the partition IDs, you can launch a job that runs tasks on only
those partitions by calling sc.runJob
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1686.
For example, we do this in IndexedRDD
https://github.com/amplab/spark-indexedrdd/blob/f0c42dcad1f49ce36140f0c1f7d2c3ed61ed373e/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexedRDDLike.scala#L100
to get particular keys without launching a task on every partition.

Ankur http://www.ankurdave.com/

On Sun, May 17, 2015 at 8:32 AM, mas mas.ha...@gmail.com wrote:

 I have distributed my RDD into say 10 nodes. I want to fetch the data that
 resides on a particular node say node 5. How i can achieve this?
 I have tried mapPartitionWithIndex function to filter the data of that
 corresponding node, however it is pretty expensive.



Re: textFileStream Question

2015-05-17 Thread Vadim Bichutskiy
This is cool. Thanks Akhil.

ᐧ

On Sun, May 17, 2015 at 11:25 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 With file timestamp, you can actually see the finding new files logic
 from here
 https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L172

 Thanks
 Best Regards

 On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 How does textFileStream work behind the scenes? How does Spark Streaming
 know what files are new and need to be processed? Is it based on time
 stamp, file name?

 Thanks,
 Vadim
 ᐧ





Big Data Day LA: FREE Big Data Conference in Los Angeles on June 27, 2015

2015-05-17 Thread Slim Baltagi
Please register for the 3rd annual full day ‘Big Data Day LA’ here: -
http://bigdatadayla.org
•   Location: Los Angeles
•   Date: June 27, 2015 
•   Completely FREE: Attendance, Food (Breakfast, Lunch  Coffee Breaks) and
Networking Reception
•   Vendor neutral
•   Great lineup of presentations, workshops, panels, and keynotes.
•   Over 45 talks organized in 5 tracks: Hadoop/Spark, Big Data, Business 
Use
Cases, NoSQL, Data Science
•   Led by over 40+ volunteers
•   800+ expected attendees
Register before it sells out! http://bigdatadayla.org

Thanks

Slim Baltagi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Big-Data-Day-LA-FREE-Big-Data-Conference-in-Los-Angeles-on-June-27-2015-tp22921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming and reducing latency

2015-05-17 Thread dgoldenberg
I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming. 
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level? 

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org