You might also be interested in this:
https://issues.apache.org/jira/browse/SPARK-19031

On Tue, Jan 3, 2017 at 3:36 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think we should add something similar to mapWithState in 2.2.  It would
> be great if you could add the description of your problem to this ticket:
> https://issues.apache.org/jira/browse/SPARK-19067
>
> On Mon, Jan 2, 2017 at 2:05 PM, Jeremy Smith <jeremy.sm...@acorns.com>
> wrote:
>
>> I have a question about state tracking in Structured Streaming.
>>
>> First let me briefly explain my use case: Given a mutable data source
>> (i.e. an RDBMS) in which we assume we can retrieve a set of newly created
>> row versions (being a row that was created or updated between two given
>> `Offset`s, whatever those are), we can create a Structured Streaming
>> `Source` which retrieves the new row versions. Further assuming that every
>> logical row has some primary key, then as long as we can track the current
>> offset for each primary key, we can differentiate between new and updated
>> rows. Then, when a row is updated, we can record that the previous version
>> of that row expired at some particular time. That's essentially what I'm
>> trying to do. This would effectively give you an "event-sourcing" type of
>> historical/immutable log of changes out of a mutable data source.
>>
>> I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
>> seemed like it would allow me to do exactly the tracking that I needed, so
>> I decided to try and use that built-in functionality rather than some
>> external key/value store for storing the current "version number" of each
>> primary key. There were a lot of hard-coded hoops I had to jump through,
>> but I eventually made it work by implementing some custom LogicalPlans and
>> SparkPlans around StateStore[Save/Restore]Exec.
>>
>> Now, in Spark 2.1.0 it seems to have gotten even further away from what I
>> was using it for - the keyExpressions of StateStoreSaveExec must include a
>> timestamp column, which means that those expressions are not really keys
>> (at least not for a logical row). So it appears I can't use it that way
>> anymore (I can't blame Spark for this, as I knew what I was getting into
>> when leveraging developer APIs). There are also several hard-coded checks
>> which now make it clear that StateStore functionality is only to be used
>> for streaming aggregates, which is not really what I'm doing.
>>
>> My question is - is there a good way to accomplish the above use case
>> within Structured Streaming? Or is this the wrong use case for the state
>> tracking functionality (which increasingly seems to be targeted toward
>> aggregates only)? Is there a plan for any kind of generalized
>> `mapWithState`-type functionality for Structured Streaming, or should I
>> just give up on that and use an external key/value store for my state
>> tracking?
>>
>> Thanks,
>> Jeremy
>>
>
>

Reply via email to