Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-27 Thread Tathagata Das
rained on scala resources. Have you come >> across other use cases where people have resided to such python-scala >> hybrid approach? >> >> Regards, >> Priyank >> >> >> >> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das < >> tathagata.das1...@gma

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
unfortunately I am constrained on scala resources. Have you come > across other use cases where people have resided to such python-scala > hybrid approach? > > Regards, > Priyank > > > > On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das < > tathagata.das1...@gmail.com> wr

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
Hello Priyank Writing something purely in Scale/Java would be the most efficient. Even if we expose python APIs that allow writing custom sinks in pure Python, it wont be as efficient as Scala/Java foreach as the data would have to go through JVM / PVM boundary which has significant overheads. So

Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-13 Thread Tathagata Das
Yes. It does. On that note, Spark 2.2 (released couple of days ago) adds mapGroupsWithState in Structured Streaming. That is like mapWithState on steroids. Just saying. :) On Thu, Jul 13, 2017 at 1:01 PM, SRK wrote: > Hi, > > Do we need to specify checkpointing for

Re: Structured Streaming Questions

2017-06-28 Thread Tathagata Das
Answers inline. On Wed, Jun 28, 2017 at 10:27 AM, Revin Chalil wrote: > I am using Structured Streaming with Spark 2.1 and have some basic > questions. > > > > · Is there a way to automatically refresh the Hive Partitions > when using Parquet Sink with Partition?

Re: Spark Streaming reduceByKeyAndWindow with inverse function seems to iterate over all the keys in the window even though they are not present in the current batch

2017-06-26 Thread Tathagata Das
Unfortunately the way reduceByKeyAndWindow is implemented, it does iterate through all the counts. To have something more efficient, you may have to implement your own windowing logic using mapWithState. Something like eventDStream.flatmap { event => // find the windows each even maps to, and

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-22 Thread Tathagata Das
>> >> visitorSet1 ++visitorSet2.toTraversable >> >> >> visitorSet1 --visitorSet2.toTraversable >> >> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Yes, and in general any mutable data st

Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

2017-06-19 Thread Tathagata Das
That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this. df.withWatermark("timestamp", "1 hour") .groupBy(window("timestamp", "30 seconds")) .agg(...) Read more here -

Re: Is Structured streaming ready for production usage

2017-06-08 Thread Tathagata Das
YES. At Databricks, our customers have already been using Structured Streaming and in the last month alone processed over 3 trillion records. https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html On Thu, Jun 8, 2017 at 3:03 PM, SRK

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-06 Thread Tathagata Das
HashSet? > > On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> This may be because of HashSet is a mutable data structure, and it seems >> you are actually mutating it in "set1 ++set2". I suggest creating a new >>

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-06 Thread Tathagata Das
This may be because of HashSet is a mutable data structure, and it seems you are actually mutating it in "set1 ++set2". I suggest creating a new HashMap in the function (and add both maps into it), rather than mutating one of them. On Tue, Jun 6, 2017 at 11:30 AM, SRK

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
exactly once guarantee on input is not > guaranteed. is it? > > On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> This is the expected behavior. There are some confusing corner cases. >> If you are starting to play with Spark Streami

Re: [Spark Structured Streaming] Exception while using watermark with type of timestamp

2017-06-06 Thread Tathagata Das
Cast the timestamp column to a timestamp type. E.g. "cast timestamp as timestamp" Watermark can be defined only columns that are of type timestamp. On Jun 6, 2017 3:06 AM, "Biplob Biswas" wrote: > Hi, > > I am playing around with Spark structured streaming and we have

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
This is the expected behavior. There are some confusing corner cases. If you are starting to play with Spark Streaming, i highly recommend learning Structured Streaming instead. On Mon, Jun 5, 2017 at 11:16 AM,

Re: Spark Streaming Job Stuck

2017-06-06 Thread Tathagata Das
http://spark.apache.org/docs/latest/streaming-programming-guide.html#points-to-remember-1 Hope this helps. On Mon, Jun 5, 2017 at 2:51 PM, Jain, Nishit wrote: > I have a very simple spark streaming job running locally in standalone > mode. There is a customer receiver

Re: statefulStreaming checkpointing too often

2017-06-02 Thread Tathagata Das
There are two kinds of checkpointing going on here - metadata and data. The 100 second that you have configured is the data checkpointing (expensive, large data) where the RDD data is being written to HDFS. The 10 second one is the metadata checkpoint (cheap, small data) where the metadata of the

Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread Tathagata Das
Should release by the end of this month. On Fri, May 19, 2017 at 4:07 PM, kant kodali wrote: > Hi Patrick, > > I am using 2.1.1 and I tried the above code you sent and I get > > "java.lang.UnsupportedOperationException: Data source kafka does not > support streamed writing"

Re: checkpointing without streaming?

2017-05-18 Thread Tathagata Das
; checkpointing along with saving the RDD to a text file, the data gets > stored twice on the disk. That is why I was looking for a way to read the > checkpointed data in a different program. > > On Wed, May 17, 2017 at 12:59 PM, Tathagata Das < > tathagata.das1...@gmail.com> w

Re: checkpointing without streaming?

2017-05-17 Thread Tathagata Das
Why not just save the RDD to a proper file? text file, sequence, file, many options. Then its standard to read it back in different program. On Wed, May 17, 2017 at 12:01 AM, neelesh.sa wrote: > Is it possible to checkpoint a RDD in one run of my application and

Re: KTable like functionality in structured streaming

2017-05-16 Thread Tathagata Das
Dataframes have the combined functionalities of both KTable and Kstreams. So I dont quite understand what you mean by querying a Ktable. If you meant interactively querying a table, then you can put an aggregation streaming query into memory format sink and complete output mode to have interactive

Re: what is the difference between json format vs kafka format?

2017-05-13 Thread Tathagata Das
tRows is of type DataSet that I get from loading from Kafka > > val foo = ds.select("*").count() > val query = foo.writeStream.outputMode("complete").format("console").sta > rt(); > query.awaitTermination() > > I am just trying to parse Json messages from Ka

Re: what is the difference between json format vs kafka format?

2017-05-13 Thread Tathagata Das
I understand the confusing. "json" format is for json encoded files being written in a directory. For Kafka, use "kafk" format. Then you decode the binary data as a json, you can use the function "from_json" (spark 2.1 and above). Here is our blog post on this.

Re: Convert DStream into Streaming Dataframe

2017-05-12 Thread Tathagata Das
Unfortunately, no. DStreams and streaming DataFrames are so different in their abstractions and implementations that there is no way to convert them. On Fri, May 12, 2017 at 9:49 AM, Tejinder Aulakh wrote: > Hi, > > Is there any way to convert a DStream to a streaming

Re: Structured Streaming + initialState

2017-05-05 Thread Tathagata Das
Can you explain how your initial state is stored? is it a file, or its in a database? If its in a database, then when initialize the GroupState, you can fetch it from the database. On Fri, May 5, 2017 at 7:35 AM, Patrick McGloin wrote: > Hi all, > > With Spark

Re: unable to find how to integrate SparkSession with a Custom Receiver.

2017-05-04 Thread Tathagata Das
Structured Streaming is not designed to integrate with receivers. The sources in Structured Streaming are designed for providing stronger fault-tolerance guarantees by precisely tracking records by their offsets (e.g. Kafka offsets). This is different from the Receiver APIs which did not require

Re: Refreshing a persisted RDD

2017-05-03 Thread Tathagata Das
ewly created blacklist? Or > will it continue to hold the reference to the old dataframe? What if we had > done RDD operations instead of using Spark SQL to join the dataframes? > > > > *From: *Tathagata Das <tathagata.das1...@gmail.com> > *Date: *Wednesday, May 3, 2

Re: Refreshing a persisted RDD

2017-05-03 Thread Tathagata Das
If you want to always get the latest data in files, its best to always recreate the DataFrame. On Wed, May 3, 2017 at 7:30 AM, JayeshLalwani wrote: > We have a Structured Streaming application that gets accounts from Kafka > into > a streaming data frame. We have

Re: [Spark Streaming] - Killing application from within code

2017-05-03 Thread Tathagata Das
There isnt a clean programmatic way to kill the application running in the driver from the executor. You will have to set up addition RPC mechanism to explicitly send a signal from the executors to the application/driver to quit. On Wed, May 3, 2017 at 8:44 AM, Sidney Feiner

Re: map/foreachRDD equivalent for pyspark Structured Streaming

2017-05-03 Thread Tathagata Das
You can apply apply any kind of aggregation on windows. There are some built in aggregations (e.g. sum and count) as well as there is an API for user-defined aggregations (scala/Java) that works with both batch and streaming DFs. See the programming guide if you havent seen it already - windowing

Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Tathagata Das
Here are couple of ideas. 1. You can set up a Structured Streaming query to update in-memory table. Look at the memory sink in the programming guide - http://spark.apache.org/ docs/latest/structured-streaming-programming-guide.html#output-sinks So you can query the latest table using a specified

Re: Is checkpointing in Spark Streaming Synchronous or Asynchronous ?

2017-04-10 Thread Tathagata Das
As of now (Spark 2.2), Structured Streaming does checkpoint of the state data synchronously in every trigger. But the checkpointing is incremental, so it wont be writing all your state every time. And we will be making this asynchronous soon. On Fri, Apr 7, 2017 at 3:19 AM, kant kodali

Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Tathagata Das
The trigger interval is optionally specified in the writeStream option before start. val windowedCounts = words.groupBy( window($"timestamp", "24 hours", "24 hours"), $"word" ).count() .writeStream .trigger(ProcessingTime("10 seconds")) // optional .format("memory") .queryName("tableName")

Re: Why do we ever run out of memory in Spark Structured Streaming?

2017-04-04 Thread Tathagata Das
Are you referring to the memory usage of stateful operations like aggregations, or the new mapGroupsWithState? The current implementation of the internal state store (that maintains the stateful aggregates) is such that it keeps all the data in memory of the executor. It does use HDFS-compatible

Re: Spark streaming + kafka error with json library

2017-03-29 Thread Tathagata Das
Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly) On Wed, Mar 29, 2017 at 9:59 AM, Srikanth wrote: > Hello, > > I'm trying to use "org.json4s" % "json4s-native" library in a spark > streaming + kafka direct app. > When I use the latest version of the

Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread Tathagata Das
g DStreams). I know many people using this setting. So your > explanation will help a lot of people. > > Thanks > > On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das <t...@databricks.com> > wrote: > >> That config I not safe. Please do not use it. >> >&

Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread Tathagata Das
That config I not safe. Please do not use it. On Mar 10, 2017 10:03 AM, "shyla deshpande" wrote: > I have a spark streaming application which processes 3 kafka streams and > has 5 output operations. > > Not sure what should be the setting for

Re: Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-17 Thread Tathagata Das
Seems like an issue with the HDFS you are using for checkpointing. Its not able to write data properly. On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande wrote: > Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): > File >

Re: Counting things in Spark Structured Streaming

2017-02-09 Thread Tathagata Das
Probably something like this. dataset .filter { userData => val dateThreshold = lookupThreshold(record)// look up the threshold date based on the record details userData.date > dateThreshold // compare } .groupBy() .count() This would

Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Tathagata Das
broadcasts are not saved in checkpoints. so you have to save it externally yourself, and recover it before restarting the stream from checkpoints. On Tue, Feb 7, 2017 at 3:55 PM, Amit Sela wrote: > I know this approach, only thing is, it relies on the transformation being

Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Tathagata Das
Does restarting after a few minutes solves the problem? Could be a transient issue that lasts long enough for spark task-level retries to all fail. On Tue, Feb 7, 2017 at 4:34 PM, Srikanth wrote: > Hello, > > I had a spark streaming app that reads from kafka running for a

Re: mapWithState question

2017-01-30 Thread Tathagata Das
t; mapWithState function. Just wanted to check if this a bad pattern in any > way. > > Thank you. > > > > > > On Sat, Jan 28, 2017 at 9:14 AM, shyla deshpande <deshpandesh...@gmail.com > > wrote: > >> Thats a great idea. I will try that. Thanks. >

Re: question on spark streaming based on event time

2017-01-29 Thread Tathagata Das
Spark Streaming (DStreams) wasnt designed keeping event-time in mind. Instead, we have designed Structured Streaming to naturally deal with event time. You should check that out. Here are the pointers. - Programming guide -

Re: mapWithState question

2017-01-28 Thread Tathagata Das
1 state object for each user. union both streams into a single DStream, and apply mapWithState on it to update the user state. On Sat, Jan 28, 2017 at 12:30 AM, shyla deshpande wrote: > Can multiple DStreams manipulate a state? I have a stream that gives me > total

Re: Spark SQL DataFrame to Kafka Topic

2017-01-13 Thread Tathagata Das
Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM,

Re: streaming performance

2016-12-22 Thread Tathagata Das
>From what I understand looking at the code in stackoverflow, I think you are "simulating" the streaming version of your calculation incorrectly. You are repeatedly unioning batch dataframes to simulate streaming and then applying aggregation on the unioned DF. That will not going to compute

Re: [Spark Streaming] How to do join two messages in spark streaming(Probabaly messasges are in differnet RDD) ?

2016-12-06 Thread Tathagata Das
This sounds like something you can solve by a stateful operator. check out mapWithState. If both the message can be keyed with a common key, then you can define a keyed-state. the state will have a field for the first message.When you see the first message for a key, fill the first field with

Re: Spark-shell doesn't see changes coming from Kafka topic

2016-12-01 Thread Tathagata Das
Can you confirm the following? 1. Are you sending new data to the Kafka topic AFTER starting the streaming query? Since you have specified `*startingOffsets` *as* `latest`*, data needs to the topic after the query start for the query to receiver. 2. Are you able to read kafka data using Kafka's

Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread Tathagata Das
In the meantime, if you are interested, you can read the design doc in the corresponding JIRA - https://issues.apache.org/jira/browse/SPARK-18124 On Thu, Dec 1, 2016 at 12:53 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > That feature is coming in 2.1.0. We have added wat

Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread Tathagata Das
That feature is coming in 2.1.0. We have added watermarking, that will track the event time of the data and accordingly close old windows, output its corresponding aggregate and then drop its corresponding state. But in that case, you will have to use append mode, and aggregated data of a

Re: Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread Tathagata Das
Spark 2.0 supports writing out to files, as well as you can do custom foreach code. We havent yet officially released Sink API for custom connector to be implemented, but hopefully we will be able to do it soon. That said, I will not rule out possibility of connectors written using internal,

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-07 Thread Tathagata Das
For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation. On Mon, Nov 7, 2016 at 2:04 PM, Arijit wrote: > Hello All, > > >

Re: Hbase Connection not seraializible in Spark -> foreachrdd

2016-09-21 Thread Tathagata Das
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On Wed, Sep 21, 2016 at 4:26 PM, ayan guha wrote: > Connection object is not serialisable. You need to implement a getorcreate > function which would run on each

Re: Spark streaming 2, giving error ClassNotFoundException: scala.collection.GenTraversableOnce$class

2016-08-19 Thread Tathagata Das
You seem to combining Scala 2.10 and 2.11 libraries - your sbt project is 2.11, where as you are trying to pull in spark-streaming-kafka-assembly_ *2.10*-1.6.1.jar. On Fri, Aug 19, 2016 at 11:24 AM, Mich Talebzadeh wrote: > Hi, > > My spark streaming app with 1.6.1

Re: Structured Streaming Parquet Sink

2016-07-31 Thread Tathagata Das
scala> val query = > > > streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start() > > java.lang.IllegalArgumentException: Data source parquet

Re: Structured Streaming Parquet Sink

2016-07-30 Thread Tathagata Das
Correction, the two options are. - writeStream.format("parquet").option("path", "...").start() - writestream.parquet("...").start() There no start with param. On Jul 30, 2016 11:22 AM, "Jacek Laskowski" wrote: > Hi Arun, > > > As per documentation, parquet is the only

Re: PySpark 2.0 Structured Streaming Question

2016-07-20 Thread Tathagata Das
foreachWriter is not currently available in the python. we dont have a clear plan yet on when foreachWriter will be available in Python. On Wed, Jul 20, 2016 at 1:22 PM, A.W. Covert III wrote: > Hi All, > > I've been digging into spark 2.0, I have some streaming jobs running

Re: Spark Streaming - Direct Approach

2016-07-11 Thread Tathagata Das
Aah, the docs have not been updated. They are totally in production in many place. Others should chime in as well. On Mon, Jul 11, 2016 at 1:43 PM, Mail.com wrote: > Hi All, > > Can someone please confirm if streaming direct approach for reading Kafka > is still

Re: Structured Streaming Comparison to AMPS

2016-07-07 Thread Tathagata Das
We will look into streaming-streaming joins in future release of Spark, though no promises on any timeline yet. We are currently fighting to get Spark 2.0 out of the door. There isnt a JIRA for this right now. However, you can track the Structured Streaming Epic JIRA to track whats going on. I try

Re: How to handle update/deletion in Structured Streaming?

2016-07-04 Thread Tathagata Das
Input datasets which represent a input data stream only supports appending of new rows, as the stream is modeled as an unbounded table where new data in the stream are new rows being appended to the table. For transformed datasets generated from the input dataset, rows can be updated and removed

Re: Error while using checkpointing . Spark streaming 1.5.2- DStream checkpointing has been enabled but the DStreams with their functions are not serialisable

2016-06-09 Thread Tathagata Das
myFunction() is probably capturing unexpected things in the closure of the Function you have defined, because myFunction is defined outside. Try defining the myFunction inside the Function and see if the problem persists. On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane

Re: Timeline for supporting basic operations like groupBy, joins etc on Streaming DataFrames

2016-06-07 Thread Tathagata Das
1. Not all types of joins are supported. Here is the list. - Right outer joins - stream-batch not allowed, batch-stream allowed - Left outer joins - batch-stream not allowed, stream-batch allowed (reverse of Right outer join) - Stream-stream joins are not allowed In the cases of outer joins,

Re: Spark 2.0+ Structured Streaming

2016-04-28 Thread Tathagata Das
Hello Benjamin, Have you take a look at the slides of my talk in Strata San Jose - http://www.slideshare.net/databricks/taking-spark-streaming-to-the-next-level-with-datasets-and-dataframes Unfortunately there is not video, as Strata does not upload videos for everyone. I presented the same talk

Re: Streaming mapWithState API has NullPointerException

2016-02-23 Thread Tathagata Das
Yes, you should be okay to test your code. :) On Mon, Feb 22, 2016 at 5:57 PM, Aris <arisofala...@gmail.com> wrote: > If I build from git branch origin/branch-1.6 will I be OK to test out my > code? > > Thank you so much TD! > > Aris > > On Mon, Feb 22,

Re: Streaming mapWithState API has NullPointerException

2016-02-22 Thread Tathagata Das
There were a few bugs that were solved with mapWithState recently. Would be available in 1.6.1 (RC to be cut soon). On Mon, Feb 22, 2016 at 5:29 PM, Aris wrote: > Hello Spark community, and especially TD and Spark Streaming folks: > > I am using the new Spark 1.6.0

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Tathagata Das
Shixiong has already opened the PR - https://github.com/apache/spark/pull/11081 On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov wrote: > Let me know if you do need a pull request for this, I can make that happen > (given someone does a vast PR to make sure I'm understanding

Re: Understanding Spark Task failures

2016-01-28 Thread Tathagata Das
That is hard to guarantee by the system, and it is upto the app developer to ensure that this is not . For example, if the data in a message is corrupted, unless the app code is robust towards handling such data, the system will fail every time it retries that app code. On Thu, Jan 28, 2016 at

Re: can't find trackStateByKey in 1.6.0 jar?

2016-01-28 Thread Tathagata Das
its been renamed to mapWithState when 1.6.0 was released. :) On Thu, Jan 28, 2016 at 1:51 AM, Sebastian Piu wrote: > I wanted to give the new trackStateByKey method a try, but I'm missing > something very obvious here as I can't see it on the 1.6.0 jar. Is there >

Re: How to setup a long running spark streaming job with continuous window refresh

2016-01-25 Thread Tathagata Das
You can use a 1 minute tumbling window dstream.window(Minutes(1), Minutes(1)).foreachRDD { rdd => // calculate stats per key } On Thu, Jan 21, 2016 at 4:59 AM, Santoshakhilesh < santosh.akhil...@huawei.com> wrote: > Hi, > > I have following scenario in my project; > > 1.I will continue

Re: Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-25 Thread Tathagata Das
First of all, if you are running batches of 15 minutes, and you dont need second level latencies, it might be just easier to run batch jobs in a for loop - you will have greater control over what is going on. And if you are using reduceByKeyAndWindow without the inverseReduceFunction, then Spark

Re: Spark Streaming - Custom ReceiverInputDStream ( Custom Source) In java

2016-01-25 Thread Tathagata Das
See how other Java wrapper classes use JavaSparkContext.fakeClassTag example; https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaMapWithStateDStream.scala On Fri, Jan 22, 2016 at 2:00 AM, Nagu Kothapalli wrote:

Re: Number of CPU cores for a Spark Streaming app in Standalone mode

2016-01-18 Thread Tathagata Das
If you are using receiver-based input streams, then you have to dedicate 1 core to each receiver. If you read only once per minute on each receiver, than consider consolidating the data reading pipeline such that you can use fewer receivers. On Mon, Jan 18, 2016 at 12:13 PM, radoburansky

Re: [KafkaRDD]: rdd.cache() does not seem to work

2016-01-13 Thread Tathagata Das
Can you just simplify the code and run a few counts to see if the cache is being used (later jobs are faster or not). In addition, use the Spark UI to see whether it is cached, see the DAG viz of the job to see whethr it is using the cached RDD or not (DAG will show a green vertex if RDD is

Re: Spark streaming routing

2016-01-07 Thread Tathagata Das
You cannot guarantee that each key will forever be on the same executor. That is flawed approach to designing an application if you have to take ensure fault-tolerance toward executor failures. On Thu, Jan 7, 2016 at 9:34 AM, Lin Zhao wrote: > I have a need to route the

Re: Problems with too many checkpoint files with Spark Streaming

2016-01-06 Thread Tathagata Das
Could you show a sample of the file names? There are multiple things that are using UUIDs so would be good to see what are 100s of directories that being generated every second. If you are checkpointing every 400s then there shouldnt be checkpoint directories written every second. They should be

Re: UpdateStateByKey : Partitioning and Shuffle

2016-01-05 Thread Tathagata Das
Both mapWithState and updateStateByKey by default uses the HashPartitioner, and hashes the key in the key-value DStream on which the state operation is applied. The new data and state is partition in the exact same partitioner, so that same keys from the new data (from the input DStream) get

Re: Does state survive application restart in StatefulNetworkWordCount?

2016-01-04 Thread Tathagata Das
It does get recovered if you restart from checkpoints. See the example RecoverableNetworkWordCount.scala On Sat, Jan 2, 2016 at 6:22 AM, Rado Buranský wrote: > I am trying to understand how state in Spark Streaming works in general. > If I run this example program twice

Re: Batch together RDDs for Streaming output, without delaying execution of map or transform functions

2016-01-04 Thread Tathagata Das
You could enforce the evaluation of the transformed DStream by putting a dummy output operation on it, and then do the windowing. transformedDStream.foreachRDD { _.count() } // to enforce evaluation of the trnasformation transformedDStream.window(...).foreachRDD( rdd => ... } On Thu, Dec 31,

Re: how to spark streaming application start working on next batch before completing on previous batch .

2015-12-15 Thread Tathagata Das
Just to be clear. spark.treaming.concurrentJobs is NOT officially supported. There are issues with fault-tolerance and data loss if that is set to more than 1. On Tue, Dec 15, 2015 at 9:19 AM, Mukesh Jha wrote: > Try setting *spark*.streaming.*concurrent*. *jobs* to

Re: Mixing Long Run Periodic Update Jobs With Streaming Scoring

2015-12-15 Thread Tathagata Das
One general advice I can provide is if you wish to run the batch jobs concurrently to spark streaming jobs, then you should to put then in different fair scheduling pools, and prioritize the streaming pool, to minimize the streaming jobs from being impacted by the batch jobs. See spark docs online

Re: Spark parallelism with mapToPair

2015-12-15 Thread Tathagata Das
Since mapToPair will be called on each record, and the # records can be tens or millions, you probably do not want to run ALL of them in parallel. So think about your strategy here. In general the parallelism can be controlled by setting the number of partitions in the groupByKey operation. On

Re: State management in spark-streaming

2015-12-15 Thread Tathagata Das
Well, the trackStateByKey has been renamed to mapWithState in upcoming 1.6. And regarding the usecase, you can easily implement this with updateStateByKey. See https://github.com/apache/spark/blob/branch-1.5/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

Re: Automatic driver restart does not seem to be working in Spark Standalone

2015-11-25 Thread Tathagata Das
What do you mean by killing the streaming job using UI? Do you mean that you are clicking the "kill" link in the Jobs page in the Spark UI? Also in the application, is the main thread waiting on streamingContext.awaitTermination()? That is designed to catch exceptions in running job and throw it

Re: Initial State

2015-11-22 Thread Tathagata Das
There is a way. Please see the scala docs. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions The first version of updateStateByKey has the parameter "initialRDD" On Fri, Nov 20, 2015 at 6:52 PM, Bryan wrote:

Re: Streaming Job gives error after changing to version 1.5.2

2015-11-18 Thread Tathagata Das
If possible, could you give us the root cause and solution for future readers of this thread. On Wed, Nov 18, 2015 at 6:37 AM, swetha kasireddy <swethakasire...@gmail.com > wrote: > It works fine after some changes. > > -Thanks, > Swetha > > On Tue, Nov 17, 2015 at 10

Re: Calculating Timeseries Aggregation

2015-11-18 Thread Tathagata Das
aggregation? > > Regards > SM > > For minutes level aggregates I have set up a streaming window say 10 > seconds and storing minutes level aggregates across multiple dimension in > HBase at every window interval. > > On 18-Nov-2015, at 7:45 AM, Tathagata Das <t...@databricks.c

Re: kafka streaminf 1.5.2 - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

2015-11-17 Thread Tathagata Das
Are you creating a fat assembly jar with spark-streaming-kafka included and using that to run your code? If yes, I am not sure why it is not finding it. If not, then make sure that your framework places the spark-stremaing-kafka jara in the runtime classpath. On Tue, Nov 17, 2015 at 6:04 PM,

Re: Streaming Job gives error after changing to version 1.5.2

2015-11-17 Thread Tathagata Das
Are you running 1.5.2-compiled jar on a Spark 1.5.2 cluster? On Tue, Nov 17, 2015 at 5:34 PM, swetha wrote: > > > Hi, > > I see java.lang.NoClassDefFoundError after changing the Streaming job > version to 1.5.2. Any idea as to why this is happening? Following are my

Re: Calculating Timeseries Aggregation

2015-11-17 Thread Tathagata Das
For this sort of long term aggregations you should use a dedicated data storage systems. Like a database, or a key-value store. Spark Streaming would just aggregate and push the necessary data to the data store. TD On Sat, Nov 14, 2015 at 9:32 PM, Sandip Mehta wrote:

Re: Streaming Job gives error after changing to version 1.5.2

2015-11-17 Thread Tathagata Das
th...@iag.com.au> >> wrote: >> >>> If you are running a local context, could it be that you should use: >>> >>> >>> >>> provided >>> >>> >>> >>> ? >>> >>> >>> >&g

Re: dynamic allocation w/ spark streaming on mesos?

2015-11-11 Thread Tathagata Das
The reason the existing dynamic allocation does not work out of the box for spark streaming is because the heuristics used for decided when to scale up/down is not the right one for micro-batch workloads. It works great for typical batch workloads. However you can use the underlying developer API

Re: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterReceiver

2015-11-09 Thread Tathagata Das
to > run it from eclipse. > > Is there any problem running the application from eclipse ? > > > > On 9 November 2015 at 12:27, Tathagata Das <t...@databricks.com> wrote: > >> How are you submitting the spark application? >> You are supposed to submit the f

Re: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterReceiver

2015-11-09 Thread Tathagata Das
How are you submitting the spark application? You are supposed to submit the fat-jar of the application that include the spark-streaming-twitter dependency (and its subdeps) but not spark-streaming and spark-core. On Mon, Nov 9, 2015 at 1:02 AM, أنس الليثي wrote: > I

Re: How to unpersist a DStream in Spark Streaming

2015-11-05 Thread Tathagata Das
Spark streaming automatically takes care of unpersisting any RDDs generated by DStream. You can set the StreamingContext.remember() to set the minimum persistence duration. Any persisted RDD older than that will be automatically unpersisted On Thu, Nov 5, 2015 at 9:12 AM, swetha kasireddy

Re: kinesis batches hang after YARN automatic driver restart

2015-11-03 Thread Tathagata Das
The Kinesis integration underneath uses the KCL libraries which takes a minute or so sometimes to spin up the threads and start getting data from Kinesis. That is under normal conditions. In your case, it could be happening that because of your killing and restarting, the restarted KCL may be

Re: Stack overflow error caused by long lineage RDD created after many recursions

2015-10-30 Thread Tathagata Das
You have to run some action after rdd.checkpointi() for the checkpointing to actually occur. Have you done that? On Fri, Oct 30, 2015 at 3:10 PM, Panos Str wrote: > Hi all! > > Here's a part of a Scala recursion that produces a stack overflow after > many > recursions. I've

Re: [Spark Streaming] Connect to Database only once at the start of Streaming job

2015-10-28 Thread Tathagata Das
Yeah, of course. Just create an RDD from jdbc, call cache()/persist(), then force it to be evaluated using something like count(). Once it is cached, you can use it in a StreamingContext. Because of the cache it should not access JDBC any more. On Tue, Oct 27, 2015 at 12:04 PM, diplomatic Guru

Re: [Spark Streaming] Connect to Database only once at the start of Streaming job

2015-10-28 Thread Tathagata Das
However, if your executor dies. Then it may reconnect to JDBC to reconstruct the RDD partitions that were lost. To prevent that you can checkpoint the RDD to a HDFS-like filesystem (using rdd.checkpoint()). Then you are safe, it wont reconnect to JDBC. On Tue, Oct 27, 2015 at 11:17 PM, Tathagata

Re: SPARKONHBase checkpointing issue

2015-10-28 Thread Tathagata Das
Yes, the workaround is the same that has been suggested in the JIRA for accumulator and broadcast variables. Basically make a singleton object which lazily initializes the HBaseContext. Because of singleton, it wont get serialized through checkpoint. After recovering, it will be reinitialized

Re: [Spark Streaming] Why are some uncached RDDs are growing?

2015-10-28 Thread Tathagata Das
UpdateStateByKey automatically caches its RDDs. On Tue, Oct 27, 2015 at 8:05 AM, diplomatic Guru wrote: > > Hello All, > > When I checked my running Stream job on WebUI, I can see that some RDDs > are being listed that were not requested to be cached. What more is that

Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
If you just want to control the number of reducers, then setting the numPartitions is sufficient. If you want to control how exact partitioning scheme (that is some other scheme other than hash-based) then you need to implement a custom partitioner. It can be used to improve data skews, etc. which

<    1   2   3   4   5   6   7   8   9   >