I think we should add something similar to mapWithState in 2.2. It would be great if you could add the description of your problem to this ticket: https://issues.apache.org/jira/browse/SPARK-19067
On Mon, Jan 2, 2017 at 2:05 PM, Jeremy Smith <jeremy.sm...@acorns.com> wrote: > I have a question about state tracking in Structured Streaming. > > First let me briefly explain my use case: Given a mutable data source > (i.e. an RDBMS) in which we assume we can retrieve a set of newly created > row versions (being a row that was created or updated between two given > `Offset`s, whatever those are), we can create a Structured Streaming > `Source` which retrieves the new row versions. Further assuming that every > logical row has some primary key, then as long as we can track the current > offset for each primary key, we can differentiate between new and updated > rows. Then, when a row is updated, we can record that the previous version > of that row expired at some particular time. That's essentially what I'm > trying to do. This would effectively give you an "event-sourcing" type of > historical/immutable log of changes out of a mutable data source. > > I noticed that in Spark 2.0.1 there was a concept of a StateStore, which > seemed like it would allow me to do exactly the tracking that I needed, so > I decided to try and use that built-in functionality rather than some > external key/value store for storing the current "version number" of each > primary key. There were a lot of hard-coded hoops I had to jump through, > but I eventually made it work by implementing some custom LogicalPlans and > SparkPlans around StateStore[Save/Restore]Exec. > > Now, in Spark 2.1.0 it seems to have gotten even further away from what I > was using it for - the keyExpressions of StateStoreSaveExec must include a > timestamp column, which means that those expressions are not really keys > (at least not for a logical row). So it appears I can't use it that way > anymore (I can't blame Spark for this, as I knew what I was getting into > when leveraging developer APIs). There are also several hard-coded checks > which now make it clear that StateStore functionality is only to be used > for streaming aggregates, which is not really what I'm doing. > > My question is - is there a good way to accomplish the above use case > within Structured Streaming? Or is this the wrong use case for the state > tracking functionality (which increasingly seems to be targeted toward > aggregates only)? Is there a plan for any kind of generalized > `mapWithState`-type functionality for Structured Streaming, or should I > just give up on that and use an external key/value store for my state > tracking? > > Thanks, > Jeremy >