I have a question about state tracking in Structured Streaming. First let me briefly explain my use case: Given a mutable data source (i.e. an RDBMS) in which we assume we can retrieve a set of newly created row versions (being a row that was created or updated between two given `Offset`s, whatever those are), we can create a Structured Streaming `Source` which retrieves the new row versions. Further assuming that every logical row has some primary key, then as long as we can track the current offset for each primary key, we can differentiate between new and updated rows. Then, when a row is updated, we can record that the previous version of that row expired at some particular time. That's essentially what I'm trying to do. This would effectively give you an "event-sourcing" type of historical/immutable log of changes out of a mutable data source.
I noticed that in Spark 2.0.1 there was a concept of a StateStore, which seemed like it would allow me to do exactly the tracking that I needed, so I decided to try and use that built-in functionality rather than some external key/value store for storing the current "version number" of each primary key. There were a lot of hard-coded hoops I had to jump through, but I eventually made it work by implementing some custom LogicalPlans and SparkPlans around StateStore[Save/Restore]Exec. Now, in Spark 2.1.0 it seems to have gotten even further away from what I was using it for - the keyExpressions of StateStoreSaveExec must include a timestamp column, which means that those expressions are not really keys (at least not for a logical row). So it appears I can't use it that way anymore (I can't blame Spark for this, as I knew what I was getting into when leveraging developer APIs). There are also several hard-coded checks which now make it clear that StateStore functionality is only to be used for streaming aggregates, which is not really what I'm doing. My question is - is there a good way to accomplish the above use case within Structured Streaming? Or is this the wrong use case for the state tracking functionality (which increasingly seems to be targeted toward aggregates only)? Is there a plan for any kind of generalized `mapWithState`-type functionality for Structured Streaming, or should I just give up on that and use an external key/value store for my state tracking? Thanks, Jeremy