Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Tathagata Das
Hello Rachana, Getting exactly-once semantics on files and making it scale to a very large number of files are very hard problems to solve. While Structured Streaming + built-in file sink solves the exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling in

Re: how can i write spark addListener metric to kafka

2020-06-09 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis On Tue, Jun 9, 2020 at 4:42 PM a s wrote: > hi Guys, > > I am building a structured streaming app for google analytics data > > i want to capture the

Re: Spark, read from Kafka stream failing AnalysisException

2020-04-05 Thread Tathagata Das
Have you looked at the suggestion made by the error by searching for "Structured Streaming + Kafka Integration Guide" in Google? It should be the first result. The last section in the "Structured Streaming

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Tathagata Das
Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. See the timeout behavior details here - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState On Wed, Mar 4, 2020 at 2:51 PM

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
ng a simple group by. > > Regards, > > Bryan Jeffrey > > Get Outlook for Android <https://aka.ms/ghei36> > > -- > *From:* Tathagata Das > *Sent:* Friday, February 28, 2020 4:56:07 PM > *To:* Bryan Jeffrey > *Cc:* user > *Subject:*

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably

Re: dropDuplicates and watermark in structured streaming

2020-02-28 Thread Tathagata Das
lec ssmi wrote: > Such as : > df.withWarmark("time","window > size").dropDulplicates("id").withWatermark("time","real > watermark").groupBy(window("time","window size","window > size")).agg(count(&

Re: dropDuplicates and watermark in structured streaming

2020-02-27 Thread Tathagata Das
1. Yes. All times in event time, not processing time. So you may get 10AM event time data at 11AM processing time, but it will still be compared again all data within 9-10AM event times. 2. Show us your code. On Thu, Feb 27, 2020 at 2:30 AM lec ssmi wrote: > Hi: > I'm new to structured

Re: Spark Streaming: Aggregating values across batches

2020-02-27 Thread Tathagata Das
Use Structured Streaming. Its aggregation, by definition, is across batches. On Thu, Feb 27, 2020 at 3:17 PM Something Something < mailinglist...@gmail.com> wrote: > We've a Spark Streaming job that calculates some values in each batch. > What we need to do now is aggregate values across ALL

Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread Tathagata Das
You can use *from_json* built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark wrote: > hi,all : > I'm using Structured

Re: Structured Streaming: How to add a listener for when a batch is complete

2019-09-03 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis On Tue, Sep 3, 2019, 3:26 PM Natalie Ruiz wrote: > Hello all, > > > > I’m a beginner, new to Spark and wanted to know if there was an equivalent > to Spark

Re: Structured Streaming Dataframe Size

2019-08-29 Thread Tathagata Das
select * from **myAggTable* This will give awesome ACID transactional guarantees between reads and writes. Read more on the linked website (full disclosure, I work on that project as well). > Thank you very much for your help! > > > On Tue, Aug 27, 2019, 6:42 PM Tathagata Das

Re: Structured Streaming Dataframe Size

2019-08-27 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts *Note that Structured Streaming does not materialize the entire table*. It > reads the latest available data from the streaming data source, processes > it incrementally to update the result, and then

Announcing Delta Lake 0.3.0

2019-08-01 Thread Tathagata Das
Hello everyone, We are excited to announce the availability of Delta Lake 0.3.0 which introduces new programmatic APIs for manipulating and managing data in Delta Lake tables. Here are the main features: - Scala/Java APIs for DML commands - You can now modify data in Delta Lake

Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Tathagata Das
@ayan guha @Gourav Sengupta Delta Lake is OSS currently does not support defining tables in Hive metastore using DDL commands. We are hoping to add the necessary compatibility fixes in Apache Spark to make Delta Lake work with tables and DDL commands. So we will support them in a future release.

Re: How to execute non-timestamp-based aggregations in spark structured streaming?

2019-04-22 Thread Tathagata Das
ow_number() > over (partition by Origin order by OnTimeDepPct desc) OnTimeDepRank,* > from flights > > This will *not* work in *structured streaming* : The culprit is: > > partition by Origin > > The requirement is to use a timestamp-typed field such as > > partitio

Re: Iterator of KeyValueGroupedDataset.flatMapGroupsWithState function

2018-10-31 Thread Tathagata Das
It is okay to collect the iterator. That will not break Spark. However, collecting it requires memory in the executor, so you may cause OOMs if a group has a LOT of new data. On Wed, Oct 31, 2018 at 3:44 AM Antonio Murgia - antonio.murg...@studio.unibo.it wrote: > Hi all, > > I'm currently

Re: [Structured Streaming] Two watermarks and StreamingQueryListener

2018-08-10 Thread Tathagata Das
Structured Streaming internally maintains one global watermark by taking a min of the two watermarks. Thats why one gets reported. In Spark 2.4, there will be the option of choosing max instead of min. Just curious. Why do you have to two watermarks? Whats the query like. TD On Thu, Aug 9, 2018

Re: How to read json data from kafka and store to hdfs with spark structued streaming?

2018-07-26 Thread Tathagata Das
Are you writing multiple streaming query output to the same location? If so, I can see this error occurring. Multiple streaming queries writing to the same directory is not supported. On Tue, Jul 24, 2018 at 3:38 PM, dddaaa wrote: > I'm trying to read json messages from kafka and store them in

Re: Exceptions with simplest Structured Streaming example

2018-07-26 Thread Tathagata Das
Unfortunately, your output is not visible in the email that we see. Was it an image that some got removed? Maybe best to copy the output text (i.e. the error message) into the email. On Thu, Jul 26, 2018 at 5:41 AM, Jonathan Apple wrote: > Hello, > > There is a streaming World Count example at

Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Tathagata Das
Yes. Yes you can. On Tue, Jul 17, 2018 at 11:42 AM, Sathi Chowdhury wrote: > Hi, > My question is about ability to integrate spark streaming with multiple > clusters.Is it a supported use case. An example of that is that two topics > owned by different group and they have their own kakka infra

Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread Tathagata Das
Note that this is not public API yet. Hence this is not very documented. So use it at your own risk :) On Tue, Jul 10, 2018 at 11:04 AM, subramgr wrote: > Hi, > > This looks very daunting *trait* is there some blog post or some articles > which explains on how to implement this *trait* > >

Re: [Structured Streaming] Reading Checkpoint data

2018-07-09 Thread Tathagata Das
Only the stream metadata (e.g., streamid, offsets) are stored as json. The stream state data is stored in an internal binary format. On Mon, Jul 9, 2018 at 4:07 PM, subramgr wrote: > Hi, > > I read somewhere that with Structured Streaming all the checkpoint data is > more readable (Json) like.

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread Tathagata Das
Hey all, In Spark 2.4.0, there will be a new feature called *foreachBatch* which will expose the output rows of every micro-batch as a dataframe, on which you apply a user-defined function. With that, you can reuse existing batch sources for writing results as well as write results to multiple

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-28 Thread Tathagata Das
; Its all documented - https://spark.apache.org/docs/ >> latest/structured-streaming-programming-guide.html#monitorin >> g-streaming-queries >> >> On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Str

Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-06-28 Thread Tathagata Das
The fundamental conceptual difference between the windowing in DStream vs Structured Streaming is that DStream used the arrival time of the record in Spark (aka processing time) and Structured Streaming using event time. If you want to exactly replicate DStream's processing time windows in

Re: Recommendation of using StreamSinkProvider for a custom KairosDB Sink

2018-06-25 Thread Tathagata Das
This is interface is actually unstable. The v2 of DataSource APIs is being designed right now which will be public and stable in a release or two. So unfortunately there is no stable interface right now that I can officially recommend. That said, you could always use the ForeachWriter interface

Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Tathagata Das
Actually, we do not support jdbc sink yet. The blog post was just an example :) I agree it is misleading in hindsight. On Wed, Jun 20, 2018 at 6:09 PM, kant kodali wrote: > Hi All, > > Does Spark Structured Streaming have a JDBC sink or Do I need to use > ForEachWriter? I see the following code

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
-streaming-queries On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das wrote: > Structured Streaming does not maintain a queue of batch like DStream. > DStreams used to cut off batches at a fixed interval and put in a queue, > and a different thread processed queued batches. In contrast, S

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
Structured Streaming does not maintain a queue of batch like DStream. DStreams used to cut off batches at a fixed interval and put in a queue, and a different thread processed queued batches. In contrast, Structured Streaming simply cuts off and immediately processes a batch after the previous

Re: [structured-streaming][parquet] readStream files order in Parquet

2018-06-15 Thread Tathagata Das
The files are processed in the order the file last modified timestamp. The path and partitioning scheme are not used for ordering. On Thu, Jun 14, 2018 at 6:59 AM, karthikjay wrote: > My parquet files are first partitioned by environment and then by date > like: > > env=testing/ >

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Glad that it worked out! It's unfortunate that there exist such pitfalls. And there is no easy way to get around it. If you can, let us know how your experience with mapGroupsWithState has been. TD On Fri, Jun 8, 2018 at 1:49 PM, frankdede wrote: > You are exactly right! A few hours ago, I

Re: Reset the offsets, Kafka 0.10 and Spark

2018-06-08 Thread Tathagata Das
Structured Streaming really makes this easy. You can simply specify the option of whether the start the query from earliest or latest. Check out - https://www.slideshare.net/databricks/a-deep-dive-into-structured-streaming -

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Try to define the watermark on the right column immediately before calling `groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark and then doing a bunch of opaque transformation (user-defined flatMap that the planner has no visibility into). This prevents the planner from

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the driver. So this is nothing to do with streaming aggregation state which are kept in the memory of the executors, not the driver. On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim wrote: > 1. Could you

Re: can we use mapGroupsWithState in raw sql?

2018-04-16 Thread Tathagata Das
Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > can

Re: Structured Streaming on Kubernetes

2018-04-13 Thread Tathagata Das
Structured streaming is stable in production! At Databricks, we and our customers collectively process almost 100s of billions of records per day using SS. However, we are not using kubernetes :) Though I don't think it will matter too much as long as kubes are correctly provisioned+configured

Re: Does partition by and order by works only in stateful case?

2018-04-12 Thread Tathagata Das
The traditional SQL windows with `over` is not supported in streaming. Only time-based windows, that is, `window("timestamp", "10 minutes")` is supported in streaming. On Thu, Apr 12, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > Does partition by and order by works only

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
es must be executed with writeStream.start() > > > But what i need to do in this step is only transforming json string data > to Dataset . How to fix it? > > Thanks! > > > Regard, > Junfeng Chen > > On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < > tathagata.das1.

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured

Re: Does structured streaming support Spark Kafka Direct?

2018-04-12 Thread Tathagata Das
The parallelism is same for Structured Streaming. In fact, the Kafka Structured Streaming source is based on the same principle as DStream's Kafka Direct, hence it has very similar behavior. On Tue, Apr 10, 2018 at 11:03 PM, SRK wrote: > hi, > > We have code based on

Re: Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread Tathagata Das
Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint directory that you provide. And when you start the query again with the same directory it will just pick up where it left off.

Re: [Structured Streaming] Application Updates in Production

2018-03-22 Thread Tathagata Das
new app will not pick up from the old > checkpoints, one would need to keep the old app and the new app running > until new app catches up on data processing with the old app. > > > ----- Original message - > From: Tathagata Das <tathagata.das1...@gmail.com> > To: Priy

Re: [Structured Streaming] Application Updates in Production

2018-03-21 Thread Tathagata Das
Why do you want to start the new code in parallel to the old one? Why not stop the old one, and then start the new one? Structured Streaming ensures that all checkpoint information (offsets and state) are future-compatible (as long as state schema is unchanged), hence new code should be able to

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
-with-tathagata-das On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris <chris.bow...@microfocus.com> wrote: > You need to tell Spark about the structure of the data, it doesn't know > ahead of time if you put avro, json, protobuf, etc. in kafka for the > message format. If the messa

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Tathagata Das
Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html This is true stream-stream join which will automatically buffer delayed data and appropriately join stuff with SQL join semantics. Please check it out :) TD On Wed, Mar 14, 2018 at 12:07

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-12 Thread Tathagata Das
rspective, I believe just spitting out nulls for every > trigger until there is a match and when there is match spitting out the > joined rows should suffice isn't it? > > Sorry if my thoughts are too naive! > > > > > > > > > > > On Thu, Mar 8, 2018

Re: Upgrades of streaming jobs

2018-03-09 Thread Tathagata Das
Yes, all checkpoints are forward compatible. However, you do need to restart the query if you want to update the code of the query. This downtime can be in less than a second (if you just restart the query without stopping the application/Spark driver) or 10s of seconds (if you have to stop the

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Tathagata Das
A tickets related to full >>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is >>>> it two implement both of these? It turns out the update mode and full outer >>>> join is very useful and required in my case, therefore, I'm just asking.

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-07 Thread Tathagata Das
ust many this line: > >> CachedKafkaConsumer: CachedKafkaConsumer is not running in >> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are >> interrupted because of KAFKA-1894. > > > > Regard, > Junfeng Chen > > On Wed, Mar 7, 2018 at 3

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
ests compile > ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr > -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn > > > On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hey, >> >&g

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Tathagata Das
Which version of Spark are you using? And can you give us the full stack trace of the exception? On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen wrote: > I am trying to read kafka and save the data as parquet file on hdfs > according to this

Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-03-02 Thread Tathagata Das
sful. This way we get at least > once semantic and partial file write issue. > > Thoughts ? > > > Sunil Parmar > > On Wed, Feb 28, 2018 at 1:59 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> There is no good way to save to parquet witho

Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-28 Thread Tathagata Das
I made a JIRA for it - https://issues.apache.org/jira/browse/SPARK-23539 Unfortunately it is blocked by Kafka version upgrade, which has a few nasty issues related to Kafka bugs - https://issues.apache.org/jira/browse/SPARK-18057 On Wed, Feb 28, 2018 at 3:17 PM, karthikus

Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-02-28 Thread Tathagata Das
There is no good way to save to parquet without causing downstream consistency issues. You could use foreachRDD to get each RDD, convert it to DataFrame/Dataset, and write out as parquet files. But you will later run into issues with partial files caused by failures, etc. On Wed, Feb 28, 2018 at

Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread Tathagata Das
Let me answer the original question directly, that is, how do we determine that an event is late. We simply track the maximum event time the engine has seen in the data it has processed till now. And any data that has event time less than the max is basically "late" (as it is out-of-order). Now,

Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Tathagata Das
Unfortunately, exposing Kafka headers is not yet supported in Structured Streaming. The community is more than welcome to add support for it :) On Tue, Feb 27, 2018 at 2:51 PM, Karthik Jayaraman wrote: > Hi all, > > I am using Spark 2.2.1 Structured Streaming to read

Re: Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-25 Thread Tathagata Das
The continuous one is our new low latency continuous processing engine in Structured Streaming (to be released in 2.3). Here is the pre-release doc - https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs/_site/structured-streaming-programming-guide.html#continuous-processing On Sun, Feb

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey, Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this. TD On Tue, Feb 20, 2018 at 8:20 PM, kant kodali wrote: > if I change it to the below code it works. However, I don't believe it is > the solution I am looking

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-02-14 Thread Tathagata Das
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to

Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Tathagata Das
I’ve to > restart queries? > > Should i just wait for 2.3 where i'll be able to join two structured > streams ( if the release is just a few weeks away ) > > Appreciate all the help! > > thanks > App > > > > On 14 February 2018 at 4:41:52 PM, Tathagata Das

Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Tathagata Das
Let me fix my mistake :) What I suggested in that earlier thread does not work. The streaming query that joins a streaming dataset with a batch view, does not correctly pick up when the view is updated. It works only when you restart the query. That is, - stop the query - recreate the dataframes,

Re: Spark Streaming withWatermark

2018-02-06 Thread Tathagata Das
That may very well be possible. The watermark delay guarantees that any record newer than or equal to watermark (that is, max event time seen - 20 seconds), will be considered and never be ignored. It does not guarantee the other way, that is, it does NOT guarantee that records older than the

Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
> / Read text from socket >> >> val socketDF = spark >> >> .readStream >> >> .format("socket") >> >> .option("host", "localhost") >> >> .option("port", ) >> >> .load() >> >

Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
Hello Divya, To add further clarification, the Apache Bahir does not have any Structured Streaming support for Twitter. It only has support for Twitter + DStreams. TD On Wed, Jan 31, 2018 at 2:44 AM, vermanurag wrote: > Twitter functionality is not part of Core

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread Tathagata Das
Could you give the full stack trace of the exception? Also, can you do `dataframe2.explain(true)` and show us the plan output? On Wed, Jan 31, 2018 at 3:35 PM, M Singh wrote: > Hi Folks: > > I have to add a column to a structured *streaming* dataframe but when I

Re: mapGroupsWithState in Python

2018-01-31 Thread Tathagata Das
Hello Ayan, >From what I understand, mapGroupsWithState (probably the more general flatMapGroupsWithState) is the best way forward (not available in python). However, you need to figure out your desired semantics of when you want to output the deduplicated data from the stremaing query. For

Re: Max number of streams supported ?

2018-01-31 Thread Tathagata Das
Just to clarify a subtle difference between DStreams and Structured Streaming. Multiple input streams in a DStreamGraph is likely to mean they are all being processed/computed in the same way as there can be only one streaming query / context active in the StreamingContext. However, in the case of

Re: Apache Spark - Custom structured streaming data source

2018-01-25 Thread Tathagata Das
Hello Mans, The streaming DataSource APIs are still evolving and are not public yet. Hence there is no official documentation. In fact, there is a new DataSourceV2 API (in Spark 2.3) that we are migrating towards. So at this point of time, it's hard to make any concrete suggestion. You can take a

Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

2018-01-22 Thread Tathagata Das
For computing mapGroupsWithState, can you check the following. - How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState) - How long each task is taking? - How many cores does the cluster have? On Thu, Jan 18, 2018 at

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
thoughts. > > dan > ​ > > > On Fri, Jan 12, 2018 at 4:39 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hello Dan, >> >> From your code, it seems like you are setting the timeout timestamp based >> on the current processing-time /

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
Hello Dan, >From your code, it seems like you are setting the timeout timestamp based on the current processing-time / wall-clock-time, while the watermark is being calculated on the event-time ("when" column). The semantics of the EventTimeTimeout is that when the last set timeout timestamp of a

Re: Spark structured streaming time series forecasting

2018-01-09 Thread Tathagata Das
Spark-ts has been under development for a while. So I doubt there is any integration with Structured Streaming. That said, Structured Streaming uses DataFrames and Datasets, and a lot of existing libraries build on Datasets/DataFrames should work directly, especially if they are map-like

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-03 Thread Tathagata Das
1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely logical representation of data and has no association with partitions, etc. which are physical representations. 2. If you want to limit the amount of data that is processed in a trigger, then you should

Re: Can we pass the Calcite streaming sql queries to spark sql?

2017-11-09 Thread Tathagata Das
I dont think so. Calcite's SQL is an extension of standard SQL (keywords like STREAM, etc.) which we dont support; we just support regular SQL, so queries like "SELECT STREAM " will not work. On Thu, Nov 9, 2017 at 11:50 AM, kant kodali wrote: > Can we pass the Calcite

Re: Writing custom Structured Streaming receiver

2017-11-01 Thread Tathagata Das
Structured Streaming source APIs are not yet public, so there isnt a guide. However, if you are adventurous enough, you can take a look at the source code in Spark. Source API: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala

Re: Structured Stream in Spark

2017-10-25 Thread Tathagata Das
Please do not confuse old Spark Streaming (DStreams) with Structured Streaming. Structured Streaming's offset and checkpoint management is far more robust than DStreams. Take a look at my talk - https://spark-summit.org/2017/speakers/tathagata-das/ On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath

Re: Cases when to clear the checkpoint directories.

2017-10-09 Thread Tathagata Das
Any changes in the Java code (to be specific, the generated bytecode) in the functions you pass to Spark (i.e., map function, reduce function, as well as it closure dependencies) counts as "application code change", and will break the recovery from checkpoints. On Sat, Oct 7, 2017 at 11:53 AM,

Re: Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-14 Thread Tathagata Das
Are you sure the code is correct? A Dataset does not have a method "trigger". Rather I believe the correct code should be StreamingQuery query = resultDataSet*.writeStream.*trigger( ProcesingTime(1000)).format("kafka").start(); You can do all the same things you can do with Structured Streaming

Re: Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread Tathagata Das
Why not set the watermark to be looser, one that works across all partitions? The main usage of watermark is to drop state. If you loosen the watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more state with older data, but you are guaranteed that you will not drop important

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
y? >> >> I wonder between *Nested query* vs *groupByKey/**mapGroupsWithState* >> which approach is more efficient to solve this particular problem ? >> >> Thanks! >> >> >> >> >> >> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <

Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Tathagata Das
1. Generally, adding columns, etc. will not affect performance because the Spark's optimizer will automatically figure out columns that are not needed and eliminate in the optimization step. So that should never be a concern. 2. Again, this is generally not a concern as the optimizer will take

Re: Why do checkpoints work the way they do?

2017-08-29 Thread Tathagata Das
Hello, This is an unfortunate design on my part when I was building DStreams :) Fortunately, we learnt from our mistakes and built Structured Streaming the correct way. Checkpointing in Structured Streaming stores only the progress information (offsets, etc.), and the user can change their

Re: Time window on Processing Time

2017-08-29 Thread Tathagata Das
Yes, it can be! There is a sql function called current_timestamp() which is self-explanatory. So I believe you should be able to do something like import org.apache.spark.sql.functions._ ds.withColumn("processingTime", current_timestamp()) .groupBy(window("processingTime", "1 minute"))

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
rectly, your solution wouldn't return the > exact solution, since it also groups by on destination. I would say the > easiest solution would be to use flatMapGroupsWithState, where you: > .groupByKey(_.train) > > and keep in state the row with the maximum time. > > On T

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
is a streaming based query and in my > case I need to hold state for 24 hours which I forgot to mention in my > previous email. can I do ? > > *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 > hours"), "train", "dest&q

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int, dest: String, time: Timestamp] * *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* *SQL*: *"select train, dest, max(time) from trainTimesView group by train, dest"*// after calling

Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Tathagata Das
When you say "the application remained alive", do you mean the StreamingQuery stayed alive, or the whole process stayed alive? The StreamingQuery should be terminated immediately. And the stream execution threads are all daemon threads, so it should not affect the termination of the application

Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
ommend it. As stated above, it is > quite natural to chain processes via kafka. > > On Thu, Aug 24, 2017 at 11:03 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Responses inline. >> >> On Thu, Aug 24, 2017 at 7:16 PM, cbowden <cbcweb...@gmail.co

Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
Responses inline. On Thu, Aug 24, 2017 at 7:16 PM, cbowden wrote: > 1. would it not be more natural to write processed to kafka and sink > processed from kafka to s3? > I am sorry i dont fully understand this question. Could you please elaborate further, as in, what is

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
wrote: > Thanks tathagata das actually I'm planning to something like this > > activeQuery.stop() > > //unpersist and persist cached data frame > > df.unpersist() > > //read the updated data //data size of df is around 100gb > > df.persist() > > activeQuery =

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
You can do something like this. def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if

Re: Spark 2.2 streaming with append mode: empty output

2017-08-14 Thread Tathagata Das
In append mode, the aggregation outputs a row only when the watermark has been crossed and the corresponding aggregate is *final*, that is, will not be updated any more. See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking On Mon,

Re: Reusing dataframes for streaming (spark 1.6)

2017-08-09 Thread Tathagata Das
There is a DStream.transform() that does exactly this. On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju wrote: > Hi, > > We've built a batch application on Spark 1.6.1. I'm looking into how to > run the same code as a streaming (DStream based) application. This is using > pyspark.

Re: Multiple queries on same stream

2017-08-09 Thread Tathagata Das
Its important to note that running multiple streaming queries, as of today, would read the input data that many number of time. So there is a trade off between the two approaches. So even though scenario 1 wont get great catalyst optimization, it may be more efficient overall in terms of resource

Re: [Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Tathagata Das
Writing streams into some sink (preferably fault-tolerant, exactly once sink, see docs) and then joining is definitely a possible way. But you will likely incur higher latency. If you want lower latency, then stream-stream joins is the best approach, which we are working on right now. Spark 2.3 is

Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
believe is not > production ready. > > > On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> Its best to use DataFrames. You can read from as streaming or as batch. >> More details here. >> >> https://spark.apache.org/do

Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
Its best to use DataFrames. You can read from as streaming or as batch. More details here. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-27 Thread Tathagata Das
rained on scala resources. Have you come >> across other use cases where people have resided to such python-scala >> hybrid approach? >> >> Regards, >> Priyank >> >> >> >> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das < >> tathagata.das1...@gma

Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
unfortunately I am constrained on scala resources. Have you come > across other use cases where people have resided to such python-scala > hybrid approach? > > Regards, > Priyank > > > > On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das < > tathagata.das1...@gmail.com> wr

  1   2   3   4   5   6   7   8   9   >