Re: Spark Streaming - Custom ReceiverInputDStream ( Custom Source) In java

2016-01-25 Thread Tathagata Das
See how other Java wrapper classes use JavaSparkContext.fakeClassTag example; https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaMapWithStateDStream.scala On Fri, Jan 22, 2016 at 2:00 AM, Nagu Kothapalli wrote:

Re: Spark 2.0+ Structured Streaming

2016-04-28 Thread Tathagata Das
Hello Benjamin, Have you take a look at the slides of my talk in Strata San Jose - http://www.slideshare.net/databricks/taking-spark-streaming-to-the-next-level-with-datasets-and-dataframes Unfortunately there is not video, as Strata does not upload videos for everyone. I presented the same talk

Re: Structured Streaming Parquet Sink

2016-07-30 Thread Tathagata Das
Correction, the two options are. - writeStream.format("parquet").option("path", "...").start() - writestream.parquet("...").start() There no start with param. On Jul 30, 2016 11:22 AM, "Jacek Laskowski" wrote: > Hi Arun, > > > As per documentation, parquet is the only

Re: Structured Streaming Parquet Sink

2016-07-31 Thread Tathagata Das
scala> val query = > > > streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start() > > java.lang.IllegalArgumentException: Data source parquet

Re: PySpark 2.0 Structured Streaming Question

2016-07-20 Thread Tathagata Das
foreachWriter is not currently available in the python. we dont have a clear plan yet on when foreachWriter will be available in Python. On Wed, Jul 20, 2016 at 1:22 PM, A.W. Covert III wrote: > Hi All, > > I've been digging into spark 2.0, I have some streaming jobs running

Re: Spark Streaming - Direct Approach

2016-07-11 Thread Tathagata Das
Aah, the docs have not been updated. They are totally in production in many place. Others should chime in as well. On Mon, Jul 11, 2016 at 1:43 PM, Mail.com wrote: > Hi All, > > Can someone please confirm if streaming direct approach for reading Kafka > is still

Re: How to handle update/deletion in Structured Streaming?

2016-07-04 Thread Tathagata Das
Input datasets which represent a input data stream only supports appending of new rows, as the stream is modeled as an unbounded table where new data in the stream are new rows being appended to the table. For transformed datasets generated from the input dataset, rows can be updated and removed

Re: Structured Streaming Comparison to AMPS

2016-07-07 Thread Tathagata Das
We will look into streaming-streaming joins in future release of Spark, though no promises on any timeline yet. We are currently fighting to get Spark 2.0 out of the door. There isnt a JIRA for this right now. However, you can track the Structured Streaming Epic JIRA to track whats going on. I try

Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Tathagata Das
broadcasts are not saved in checkpoints. so you have to save it externally yourself, and recover it before restarting the stream from checkpoints. On Tue, Feb 7, 2017 at 3:55 PM, Amit Sela wrote: > I know this approach, only thing is, it relies on the transformation being

Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Tathagata Das
Does restarting after a few minutes solves the problem? Could be a transient issue that lasts long enough for spark task-level retries to all fail. On Tue, Feb 7, 2017 at 4:34 PM, Srikanth wrote: > Hello, > > I had a spark streaming app that reads from kafka running for a

Re: mapWithState question

2017-01-30 Thread Tathagata Das
t; mapWithState function. Just wanted to check if this a bad pattern in any > way. > > Thank you. > > > > > > On Sat, Jan 28, 2017 at 9:14 AM, shyla deshpande <deshpandesh...@gmail.com > > wrote: > >> Thats a great idea. I will try that. Thanks. >

Re: mapWithState question

2017-01-28 Thread Tathagata Das
1 state object for each user. union both streams into a single DStream, and apply mapWithState on it to update the user state. On Sat, Jan 28, 2017 at 12:30 AM, shyla deshpande wrote: > Can multiple DStreams manipulate a state? I have a stream that gives me > total

Re: question on spark streaming based on event time

2017-01-29 Thread Tathagata Das
Spark Streaming (DStreams) wasnt designed keeping event-time in mind. Instead, we have designed Structured Streaming to naturally deal with event time. You should check that out. Here are the pointers. - Programming guide -

Re: Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-17 Thread Tathagata Das
Seems like an issue with the HDFS you are using for checkpointing. Its not able to write data properly. On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande wrote: > Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): > File >

Re: Counting things in Spark Structured Streaming

2017-02-09 Thread Tathagata Das
Probably something like this. dataset .filter { userData => val dateThreshold = lookupThreshold(record)// look up the threshold date based on the record details userData.date > dateThreshold // compare } .groupBy() .count() This would

Re: Spark streaming 2, giving error ClassNotFoundException: scala.collection.GenTraversableOnce$class

2016-08-19 Thread Tathagata Das
You seem to combining Scala 2.10 and 2.11 libraries - your sbt project is 2.11, where as you are trying to pull in spark-streaming-kafka-assembly_ *2.10*-1.6.1.jar. On Fri, Aug 19, 2016 at 11:24 AM, Mich Talebzadeh wrote: > Hi, > > My spark streaming app with 1.6.1

Re: Hbase Connection not seraializible in Spark -> foreachrdd

2016-09-21 Thread Tathagata Das
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On Wed, Sep 21, 2016 at 4:26 PM, ayan guha wrote: > Connection object is not serialisable. You need to implement a getorcreate > function which would run on each

Re: Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread Tathagata Das
Spark 2.0 supports writing out to files, as well as you can do custom foreach code. We havent yet officially released Sink API for custom connector to be implemented, but hopefully we will be able to do it soon. That said, I will not rule out possibility of connectors written using internal,

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-07 Thread Tathagata Das
For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation. On Mon, Nov 7, 2016 at 2:04 PM, Arijit wrote: > Hello All, > > >

Re: [Spark Streaming] How to do join two messages in spark streaming(Probabaly messasges are in differnet RDD) ?

2016-12-06 Thread Tathagata Das
This sounds like something you can solve by a stateful operator. check out mapWithState. If both the message can be keyed with a common key, then you can define a keyed-state. the state will have a field for the first message.When you see the first message for a key, fill the first field with

Re: Spark SQL DataFrame to Kafka Topic

2017-01-13 Thread Tathagata Das
Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach On Fri, Jan 13, 2017 at 8:28 AM,

Re: Spark-shell doesn't see changes coming from Kafka topic

2016-12-01 Thread Tathagata Das
Can you confirm the following? 1. Are you sending new data to the Kafka topic AFTER starting the streaming query? Since you have specified `*startingOffsets` *as* `latest`*, data needs to the topic after the query start for the query to receiver. 2. Are you able to read kafka data using Kafka's

Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread Tathagata Das
That feature is coming in 2.1.0. We have added watermarking, that will track the event time of the data and accordingly close old windows, output its corresponding aggregate and then drop its corresponding state. But in that case, you will have to use append mode, and aggregated data of a

Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread Tathagata Das
In the meantime, if you are interested, you can read the design doc in the corresponding JIRA - https://issues.apache.org/jira/browse/SPARK-18124 On Thu, Dec 1, 2016 at 12:53 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > That feature is coming in 2.1.0. We have added wat

Re: streaming performance

2016-12-22 Thread Tathagata Das
>From what I understand looking at the code in stackoverflow, I think you are "simulating" the streaming version of your calculation incorrectly. You are repeatedly unioning batch dataframes to simulate streaming and then applying aggregation on the unioned DF. That will not going to compute

Re: Spark streaming + kafka error with json library

2017-03-29 Thread Tathagata Das
Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly) On Wed, Mar 29, 2017 at 9:59 AM, Srikanth wrote: > Hello, > > I'm trying to use "org.json4s" % "json4s-native" library in a spark > streaming + kafka direct app. > When I use the latest version of the

Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread Tathagata Das
g DStreams). I know many people using this setting. So your > explanation will help a lot of people. > > Thanks > > On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das <t...@databricks.com> > wrote: > >> That config I not safe. Please do not use it. >> >&

Re: Why do we ever run out of memory in Spark Structured Streaming?

2017-04-04 Thread Tathagata Das
Are you referring to the memory usage of stateful operations like aggregations, or the new mapGroupsWithState? The current implementation of the internal state store (that maintains the stateful aggregates) is such that it keeps all the data in memory of the executor. It does use HDFS-compatible

Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Tathagata Das
The trigger interval is optionally specified in the writeStream option before start. val windowedCounts = words.groupBy( window($"timestamp", "24 hours", "24 hours"), $"word" ).count() .writeStream .trigger(ProcessingTime("10 seconds")) // optional .format("memory") .queryName("tableName")

Re: Is checkpointing in Spark Streaming Synchronous or Asynchronous ?

2017-04-10 Thread Tathagata Das
As of now (Spark 2.2), Structured Streaming does checkpoint of the state data synchronously in every trigger. But the checkpointing is incremental, so it wont be writing all your state every time. And we will be making this asynchronous soon. On Fri, Apr 7, 2017 at 3:19 AM, kant kodali

Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Tathagata Das
Here are couple of ideas. 1. You can set up a Structured Streaming query to update in-memory table. Look at the memory sink in the programming guide - http://spark.apache.org/ docs/latest/structured-streaming-programming-guide.html#output-sinks So you can query the latest table using a specified

Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread Tathagata Das
That config I not safe. Please do not use it. On Mar 10, 2017 10:03 AM, "shyla deshpande" wrote: > I have a spark streaming application which processes 3 kafka streams and > has 5 output operations. > > Not sure what should be the setting for

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
You can do something like this. def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
wrote: > Thanks tathagata das actually I'm planning to something like this > > activeQuery.stop() > > //unpersist and persist cached data frame > > df.unpersist() > > //read the updated data //data size of df is around 100gb > > df.persist() > > activeQuery =

Re: Spark 2.2 streaming with append mode: empty output

2017-08-14 Thread Tathagata Das
In append mode, the aggregation outputs a row only when the watermark has been crossed and the corresponding aggregate is *final*, that is, will not be updated any more. See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking On Mon,

Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-13 Thread Tathagata Das
Yes. It does. On that note, Spark 2.2 (released couple of days ago) adds mapGroupsWithState in Structured Streaming. That is like mapWithState on steroids. Just saying. :) On Thu, Jul 13, 2017 at 1:01 PM, SRK wrote: > Hi, > > Do we need to specify checkpointing for

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
Hello Priyank Writing something purely in Scale/Java would be the most efficient. Even if we expose python APIs that allow writing custom sinks in pure Python, it wont be as efficient as Scala/Java foreach as the data would have to go through JVM / PVM boundary which has significant overheads. So

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
unfortunately I am constrained on scala resources. Have you come > across other use cases where people have resided to such python-scala > hybrid approach? > > Regards, > Priyank > > > > On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das < > tathagata.das1...@gmail.com> wr

Re: Structured Streaming Questions

2017-06-28 Thread Tathagata Das
Answers inline. On Wed, Jun 28, 2017 at 10:27 AM, Revin Chalil wrote: > I am using Structured Streaming with Spark 2.1 and have some basic > questions. > > > > · Is there a way to automatically refresh the Hive Partitions > when using Parquet Sink with Partition?

Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
Its best to use DataFrames. You can read from as streaming or as batch. More details here. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
believe is not > production ready. > > > On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> Its best to use DataFrames. You can read from as streaming or as batch. >> More details here. >> >> https://spark.apache.org/do

Re: [Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Tathagata Das
Writing streams into some sink (preferably fault-tolerant, exactly once sink, see docs) and then joining is definitely a possible way. But you will likely incur higher latency. If you want lower latency, then stream-stream joins is the best approach, which we are working on right now. Spark 2.3 is

Re: Multiple queries on same stream

2017-08-09 Thread Tathagata Das
Its important to note that running multiple streaming queries, as of today, would read the input data that many number of time. So there is a trade off between the two approaches. So even though scenario 1 wont get great catalyst optimization, it may be more efficient overall in terms of resource

Re: Reusing dataframes for streaming (spark 1.6)

2017-08-09 Thread Tathagata Das
There is a DStream.transform() that does exactly this. On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju wrote: > Hi, > > We've built a batch application on Spark 1.6.1. I'm looking into how to > run the same code as a streaming (DStream based) application. This is using > pyspark.

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-27 Thread Tathagata Das
rained on scala resources. Have you come >> across other use cases where people have resided to such python-scala >> hybrid approach? >> >> Regards, >> Priyank >> >> >> >> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das < >> tathagata.das1...@gma

Re: [Spark Streaming] - Killing application from within code

2017-05-03 Thread Tathagata Das
There isnt a clean programmatic way to kill the application running in the driver from the executor. You will have to set up addition RPC mechanism to explicitly send a signal from the executors to the application/driver to quit. On Wed, May 3, 2017 at 8:44 AM, Sidney Feiner

Re: Refreshing a persisted RDD

2017-05-03 Thread Tathagata Das
If you want to always get the latest data in files, its best to always recreate the DataFrame. On Wed, May 3, 2017 at 7:30 AM, JayeshLalwani wrote: > We have a Structured Streaming application that gets accounts from Kafka > into > a streaming data frame. We have

Re: Refreshing a persisted RDD

2017-05-03 Thread Tathagata Das
ewly created blacklist? Or > will it continue to hold the reference to the old dataframe? What if we had > done RDD operations instead of using Spark SQL to join the dataframes? > > > > *From: *Tathagata Das <tathagata.das1...@gmail.com> > *Date: *Wednesday, May 3, 2

Re: checkpointing without streaming?

2017-05-17 Thread Tathagata Das
Why not just save the RDD to a proper file? text file, sequence, file, many options. Then its standard to read it back in different program. On Wed, May 17, 2017 at 12:01 AM, neelesh.sa wrote: > Is it possible to checkpoint a RDD in one run of my application and

Re: KTable like functionality in structured streaming

2017-05-16 Thread Tathagata Das
Dataframes have the combined functionalities of both KTable and Kstreams. So I dont quite understand what you mean by querying a Ktable. If you meant interactively querying a table, then you can put an aggregation streaming query into memory format sink and complete output mode to have interactive

Re: what is the difference between json format vs kafka format?

2017-05-13 Thread Tathagata Das
tRows is of type DataSet that I get from loading from Kafka > > val foo = ds.select("*").count() > val query = foo.writeStream.outputMode("complete").format("console").sta > rt(); > query.awaitTermination() > > I am just trying to parse Json messages from Ka

Re: what is the difference between json format vs kafka format?

2017-05-13 Thread Tathagata Das
I understand the confusing. "json" format is for json encoded files being written in a directory. For Kafka, use "kafk" format. Then you decode the binary data as a json, you can use the function "from_json" (spark 2.1 and above). Here is our blog post on this.

Re: Convert DStream into Streaming Dataframe

2017-05-12 Thread Tathagata Das
Unfortunately, no. DStreams and streaming DataFrames are so different in their abstractions and implementations that there is no way to convert them. On Fri, May 12, 2017 at 9:49 AM, Tejinder Aulakh wrote: > Hi, > > Is there any way to convert a DStream to a streaming

Re: How save streaming aggregations on 'Structured Streams' in parquet format ?

2017-06-19 Thread Tathagata Das
That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this. df.withWatermark("timestamp", "1 hour") .groupBy(window("timestamp", "30 seconds")) .agg(...) Read more here -

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-22 Thread Tathagata Das
>> >> visitorSet1 ++visitorSet2.toTraversable >> >> >> visitorSet1 --visitorSet2.toTraversable >> >> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Yes, and in general any mutable data st

Re: Spark Streaming reduceByKeyAndWindow with inverse function seems to iterate over all the keys in the window even though they are not present in the current batch

2017-06-26 Thread Tathagata Das
Unfortunately the way reduceByKeyAndWindow is implemented, it does iterate through all the counts. To have something more efficient, you may have to implement your own windowing logic using mapWithState. Something like eventDStream.flatmap { event => // find the windows each even maps to, and

Re: checkpointing without streaming?

2017-05-18 Thread Tathagata Das
; checkpointing along with saving the RDD to a text file, the data gets > stored twice on the disk. That is why I was looking for a way to read the > checkpointed data in a different program. > > On Wed, May 17, 2017 at 12:59 PM, Tathagata Das < > tathagata.das1...@gmail.com> w

Re: statefulStreaming checkpointing too often

2017-06-02 Thread Tathagata Das
There are two kinds of checkpointing going on here - metadata and data. The 100 second that you have configured is the data checkpointing (expensive, large data) where the RDD data is being written to HDFS. The 10 second one is the metadata checkpoint (cheap, small data) where the metadata of the

Re: Is Structured streaming ready for production usage

2017-06-08 Thread Tathagata Das
YES. At Databricks, our customers have already been using Structured Streaming and in the last month alone processed over 3 trillion records. https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html On Thu, Jun 8, 2017 at 3:03 PM, SRK

Re: Spark Streaming Job Stuck

2017-06-06 Thread Tathagata Das
http://spark.apache.org/docs/latest/streaming-programming-guide.html#points-to-remember-1 Hope this helps. On Mon, Jun 5, 2017 at 2:51 PM, Jain, Nishit wrote: > I have a very simple spark streaming job running locally in standalone > mode. There is a customer receiver

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
This is the expected behavior. There are some confusing corner cases. If you are starting to play with Spark Streaming, i highly recommend learning Structured Streaming instead. On Mon, Jun 5, 2017 at 11:16 AM,

Re: [Spark Structured Streaming] Exception while using watermark with type of timestamp

2017-06-06 Thread Tathagata Das
Cast the timestamp column to a timestamp type. E.g. "cast timestamp as timestamp" Watermark can be defined only columns that are of type timestamp. On Jun 6, 2017 3:06 AM, "Biplob Biswas" wrote: > Hi, > > I am playing around with Spark structured streaming and we have

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
exactly once guarantee on input is not > guaranteed. is it? > > On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> This is the expected behavior. There are some confusing corner cases. >> If you are starting to play with Spark Streami

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-06 Thread Tathagata Das
This may be because of HashSet is a mutable data structure, and it seems you are actually mutating it in "set1 ++set2". I suggest creating a new HashMap in the function (and add both maps into it), rather than mutating one of them. On Tue, Jun 6, 2017 at 11:30 AM, SRK

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-06 Thread Tathagata Das
HashSet? > > On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> This may be because of HashSet is a mutable data structure, and it seems >> you are actually mutating it in "set1 ++set2". I suggest creating a new >>

Re: unable to find how to integrate SparkSession with a Custom Receiver.

2017-05-04 Thread Tathagata Das
Structured Streaming is not designed to integrate with receivers. The sources in Structured Streaming are designed for providing stronger fault-tolerance guarantees by precisely tracking records by their offsets (e.g. Kafka offsets). This is different from the Receiver APIs which did not require

Re: map/foreachRDD equivalent for pyspark Structured Streaming

2017-05-03 Thread Tathagata Das
You can apply apply any kind of aggregation on windows. There are some built in aggregations (e.g. sum and count) as well as there is an API for user-defined aggregations (scala/Java) that works with both batch and streaming DFs. See the programming guide if you havent seen it already - windowing

Re: Structured Streaming + initialState

2017-05-05 Thread Tathagata Das
Can you explain how your initial state is stored? is it a file, or its in a database? If its in a database, then when initialize the GroupState, you can fetch it from the database. On Fri, May 5, 2017 at 7:35 AM, Patrick McGloin wrote: > Hi all, > > With Spark

Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread Tathagata Das
Should release by the end of this month. On Fri, May 19, 2017 at 4:07 PM, kant kodali wrote: > Hi Patrick, > > I am using 2.1.1 and I tried the above code you sent and I get > > "java.lang.UnsupportedOperationException: Data source kafka does not > support streamed writing"

Re: Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-14 Thread Tathagata Das
Are you sure the code is correct? A Dataset does not have a method "trigger". Rather I believe the correct code should be StreamingQuery query = resultDataSet*.writeStream.*trigger( ProcesingTime(1000)).format("kafka").start(); You can do all the same things you can do with Structured Streaming

Re: Why do checkpoints work the way they do?

2017-08-29 Thread Tathagata Das
Hello, This is an unfortunate design on my part when I was building DStreams :) Fortunately, we learnt from our mistakes and built Structured Streaming the correct way. Checkpointing in Structured Streaming stores only the progress information (offsets, etc.), and the user can change their

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
rectly, your solution wouldn't return the > exact solution, since it also groups by on destination. I would say the > easiest solution would be to use flatMapGroupsWithState, where you: > .groupByKey(_.train) > > and keep in state the row with the maximum time. > > On T

Re: Time window on Processing Time

2017-08-29 Thread Tathagata Das
Yes, it can be! There is a sql function called current_timestamp() which is self-explanatory. So I believe you should be able to do something like import org.apache.spark.sql.functions._ ds.withColumn("processingTime", current_timestamp()) .groupBy(window("processingTime", "1 minute"))

Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Tathagata Das
1. Generally, adding columns, etc. will not affect performance because the Spark's optimizer will automatically figure out columns that are not needed and eliminate in the optimization step. So that should never be a concern. 2. Again, this is generally not a concern as the optimizer will take

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
y? >> >> I wonder between *Nested query* vs *groupByKey/**mapGroupsWithState* >> which approach is more efficient to solve this particular problem ? >> >> Thanks! >> >> >> >> >> >> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <

Re: Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread Tathagata Das
Why not set the watermark to be looser, one that works across all partitions? The main usage of watermark is to drop state. If you loosen the watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more state with older data, but you are guaranteed that you will not drop important

Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
Responses inline. On Thu, Aug 24, 2017 at 7:16 PM, cbowden wrote: > 1. would it not be more natural to write processed to kafka and sink > processed from kafka to s3? > I am sorry i dont fully understand this question. Could you please elaborate further, as in, what is

Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
ommend it. As stated above, it is > quite natural to chain processes via kafka. > > On Thu, Aug 24, 2017 at 11:03 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Responses inline. >> >> On Thu, Aug 24, 2017 at 7:16 PM, cbowden <cbcweb...@gmail.co

Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Tathagata Das
When you say "the application remained alive", do you mean the StreamingQuery stayed alive, or the whole process stayed alive? The StreamingQuery should be terminated immediately. And the stream execution threads are all daemon threads, so it should not affect the termination of the application

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
is a streaming based query and in my > case I need to hold state for 24 hours which I forgot to mention in my > previous email. can I do ? > > *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 > hours"), "train", "dest&q

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int, dest: String, time: Timestamp] * *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* *SQL*: *"select train, dest, max(time) from trainTimesView group by train, dest"*// after calling

Re: Cases when to clear the checkpoint directories.

2017-10-09 Thread Tathagata Das
Any changes in the Java code (to be specific, the generated bytecode) in the functions you pass to Spark (i.e., map function, reduce function, as well as it closure dependencies) counts as "application code change", and will break the recovery from checkpoints. On Sat, Oct 7, 2017 at 11:53 AM,

Re: Writing custom Structured Streaming receiver

2017-11-01 Thread Tathagata Das
Structured Streaming source APIs are not yet public, so there isnt a guide. However, if you are adventurous enough, you can take a look at the source code in Spark. Source API: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala

Re: Can we pass the Calcite streaming sql queries to spark sql?

2017-11-09 Thread Tathagata Das
I dont think so. Calcite's SQL is an extension of standard SQL (keywords like STREAM, etc.) which we dont support; we just support regular SQL, so queries like "SELECT STREAM " will not work. On Thu, Nov 9, 2017 at 11:50 AM, kant kodali wrote: > Can we pass the Calcite

Re: Structured Stream in Spark

2017-10-25 Thread Tathagata Das
Please do not confuse old Spark Streaming (DStreams) with Structured Streaming. Structured Streaming's offset and checkpoint management is far more robust than DStreams. Take a look at my talk - https://spark-summit.org/2017/speakers/tathagata-das/ On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the driver. So this is nothing to do with streaming aggregation state which are kept in the memory of the executors, not the driver. On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim wrote: > 1. Could you

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Glad that it worked out! It's unfortunate that there exist such pitfalls. And there is no easy way to get around it. If you can, let us know how your experience with mapGroupsWithState has been. TD On Fri, Jun 8, 2018 at 1:49 PM, frankdede wrote: > You are exactly right! A few hours ago, I

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Try to define the watermark on the right column immediately before calling `groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark and then doing a bunch of opaque transformation (user-defined flatMap that the planner has no visibility into). This prevents the planner from

Re: Reset the offsets, Kafka 0.10 and Spark

2018-06-08 Thread Tathagata Das
Structured Streaming really makes this easy. You can simply specify the option of whether the start the query from earliest or latest. Check out - https://www.slideshare.net/databricks/a-deep-dive-into-structured-streaming -

Re: [structured-streaming][parquet] readStream files order in Parquet

2018-06-15 Thread Tathagata Das
The files are processed in the order the file last modified timestamp. The path and partitioning scheme are not used for ordering. On Thu, Jun 14, 2018 at 6:59 AM, karthikjay wrote: > My parquet files are first partitioned by environment and then by date > like: > > env=testing/ >

Re: Recommendation of using StreamSinkProvider for a custom KairosDB Sink

2018-06-25 Thread Tathagata Das
This is interface is actually unstable. The v2 of DataSource APIs is being designed right now which will be public and stable in a release or two. So unfortunately there is no stable interface right now that I can officially recommend. That said, you could always use the ForeachWriter interface

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
-streaming-queries On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das wrote: > Structured Streaming does not maintain a queue of batch like DStream. > DStreams used to cut off batches at a fixed interval and put in a queue, > and a different thread processed queued batches. In contrast, S

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
Structured Streaming does not maintain a queue of batch like DStream. DStreams used to cut off batches at a fixed interval and put in a queue, and a different thread processed queued batches. In contrast, Structured Streaming simply cuts off and immediately processes a batch after the previous

Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Tathagata Das
Actually, we do not support jdbc sink yet. The blog post was just an example :) I agree it is misleading in hindsight. On Wed, Jun 20, 2018 at 6:09 PM, kant kodali wrote: > Hi All, > > Does Spark Structured Streaming have a JDBC sink or Do I need to use > ForEachWriter? I see the following code

Re: Spark structured streaming time series forecasting

2018-01-09 Thread Tathagata Das
Spark-ts has been under development for a while. So I doubt there is any integration with Structured Streaming. That said, Structured Streaming uses DataFrames and Datasets, and a lot of existing libraries build on Datasets/DataFrames should work directly, especially if they are map-like

Re: Apache Spark - Custom structured streaming data source

2018-01-25 Thread Tathagata Das
Hello Mans, The streaming DataSource APIs are still evolving and are not public yet. Hence there is no official documentation. In fact, there is a new DataSourceV2 API (in Spark 2.3) that we are migrating towards. So at this point of time, it's hard to make any concrete suggestion. You can take a

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
Hello Dan, >From your code, it seems like you are setting the timeout timestamp based on the current processing-time / wall-clock-time, while the watermark is being calculated on the event-time ("when" column). The semantics of the EventTimeTimeout is that when the last set timeout timestamp of a

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
thoughts. > > dan > ​ > > > On Fri, Jan 12, 2018 at 4:39 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hello Dan, >> >> From your code, it seems like you are setting the timeout timestamp based >> on the current processing-time /

Re: mapGroupsWithState in Python

2018-01-31 Thread Tathagata Das
Hello Ayan, >From what I understand, mapGroupsWithState (probably the more general flatMapGroupsWithState) is the best way forward (not available in python). However, you need to figure out your desired semantics of when you want to output the deduplicated data from the stremaing query. For

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread Tathagata Das
Could you give the full stack trace of the exception? Also, can you do `dataframe2.explain(true)` and show us the plan output? On Wed, Jan 31, 2018 at 3:35 PM, M Singh wrote: > Hi Folks: > > I have to add a column to a structured *streaming* dataframe but when I

<    3   4   5   6   7   8   9   >