Hey Gabor, great ideas here. It's only slightly related, but I'm currently working on a proposal to improve the queryable state APIs for lookups (partly along the lines of what you suggested with higher level accessors). Maybe you are interested in contributing there?
I really like your ideas for the use cases you describe, but I'm unsure about the write path (setKvState), because of the discussed implications to the state backends. I think that this needs more discussion and coordination with the contributors working on the backends. For example, one assumption so far was that only a single thread updates state and we don't scope state per checkpoint (to provide "isolation levels" for the queries; read comitted vs. read uncommitted) and probably more. Because of this I would actually lean towards the iteration approach in a first version. Would that be a feasible starting point for you? – Ufuk On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com) wrote: > Hi Gyula, Jinkui Shi, > > Thanks for your thoughts! > > @Gyula: I'll try and explain a bit more detail. > > The API could be almost like the QueryableState's. It could be > higher-level though: returning Java objects instead of serialized data > (because there would not be issues with class loading). Also, it could > support setKvState (see my 5. point). This could lead to both a > potential performance improvements and easier usage (see my points 2. > and 3.). > > A use-case could be anything where we'd use an external KV-store. > For instance we are updating user states based on another user state, so > in the map function we do a query (in pseudo-ish Scala code): > > users.keyBy(0).flatMapWithState { (userEvent, collector) => > val thisUser: UserState = state.get() > val otherUser: Future[UserState] = > qsClient.getKvState("users", userEvent.otherUserId) > > otherUser.onSuccess { case otherUserState => > state.update(someFunc1(thisUser, otherUserState)) > collector.collect(someFunc2(thisUser, otherUserState)) > } > } > > Another example could be (online) distributed matrix factorization, > where the two factor matrices are represented by distributed states. One > is updated by querying the other (with getKvState), and computing some > function (i.e. SGD), while the other is updated at the same place (with > setKvState). > > I see the motivation behind the QueryableState as a way to make further > use of the KV-store we practically have at stateful operators (but > please correct me if I'm wrong). I think we could make even more use of > if the KV-store is used inside the same job. > > 1) Order and timeliness > As you've mentioned it's hard to guarantee any ordering when working > with two states on possibly distinct machines. This could bring us to > distributed transaction processing, what's a complex topic in itself. I > can imagine using watermarks and keeping different state versions to > only allow querying state from the past, and not from the future. For > now, let's just assume that order does not matter. > > 2) Fault-tolerance > There might be other things we could do, but there are two simple > guarantees that we can surely provide. First, by using the current > QueryableState the task could fail with incomplete futures. If the > records producing those futures are received before the previous > checkpoint barrier, those updates will be completely lost. We could > solve this by wait for the futures to complete before starting a > checkpoint, thus providing exactly-once guarantees. This would ensure > that, although the UDF has side-effects, every record has its effect > exactly-once. I don't see a good way to provide this guarantee with the > current QueryableState. Second, we can guarantee that the query will > eventually succeed (or else the whole topology would fail). > > 3) Optimizations > I've also got two examples for optimizations. First, we can do a > best-effort to co-locate the two stateful operators to save on network > overhead. The user could try to co-locate the querying machines when > externally querying the state, but could not move the machines with the > state being queried. Second, we could provide a user-interface for > (loose) guarantees on the latency of sending and returning queries, just > like setting the buffer timeout. > > 4) Concurrent reading/writing > Key-value states and collectors might be accessed concurrently. While > the user could use locks, we the system handle this instead of the user. > E.g. using a thread-safe collector whenever we see internal KV-state > query registered at the UDF. > > 5) setKvState > We could not give exactly-once guarantees if we allowed external queries > to modify the state. When a Flink topology fails and restarts the > modifications coming from the outside would not be replayed. However, we > can simply give exactly-once guarantees if the modifications are done > inside (set aside ordering), as the records would be replayed if the > modification failed. > > I believe it would not take much effort to do these improvements. > Although, it would affect the runtime (integration with FT), and it > might not be worth working towards these goals. What do you think about > this? > > It's also worth considering that the same use-cases could be similarly > done with the iteration/loops API, but in a bit less natural way, > imitating two-direction communication. > > Should we move this discussion to a JIRA issue, to avoid flooding the > mailing list? > > > @Jinkui Shi: > 1. I think we should definitely support a flexible update strategy. I.e. > allow to choose between sync, async and bounded-delay. > 2. I really like your idea of using PS externally and connecting to it > with a source and a sink. Fault-tolerance could be also be achieved by > versioning at the PS and resending older parameters if the Flink job > fails (i.e. making PS a durable source). Although, the question is then > how to implement the PS? Do you think we could use the implementations > you've mentioned? > 3. Good idea. It's been just proposed to support GPU calculations in > Flink [1]. Fortunately, that seems orthogonal to our problem: if there's > a PS, we can later include GPU calculations. > > My main question remains: whether there's a need for an integrated PS or > not. It would not fit into the current project structure (neither in ML > nor in Streaming), so I guess the only direction is to build on top of > the streaming API and use an external service just as you've proposed. > > [1] https://issues.apache.org/jira/browse/FLINK-5782 > > Cheers, > Gabor > > > On 2017-02-13 04:10, Jinkui Shi wrote: > > hi,Gábor Hermann > > > > The online parameter server is a good proposal. > > PS’ paper [1] have a early implement [2], and now it’s mxnet [3]. > > I have some thought about online PS in Flink: > > 1. Whether support flexible and configurable update strategy? > > For example, in one iteration, computing serval times updating once or > > update every > time of iteration. > > 2. Whether we implement is fully based on the DAG, having not too much > > modify the runtime > and core? > > - The parameter server is the Source with distributed parameter data, > > called PS. > > - The worker nodes are the DAG except the Source. That is some ML > > arithmetic implemented > using Flink API. > > - Multiple layer computing’s result flow to the Sink operator naturally > > - Sink can feedback to the Source for next iteration > > - Atomic tuning the busy operator, increase/decrease the compute > > resource(the max > parallelism) of the runtime operators. > > 3. Atomically detect GPU supporting provided by Mesos, and use it if enable > > configuration > of using GPU. > > > > [1] https://www.cs.cmu.edu/~muli/file/ps.pdf > > [2] https://github.com/dmlc/parameter_server > > [3] https://github.com/dmlc/mxnet > > > >> On Feb 12, 2017, at 00:54, Gyula Fóra wrote: > >> > >> Hi Gábor, > >> > >> I think the general idea is very nice, but it would nice to see clearer > >> what benefit does this bring from the developers perspective. Maybe rough > >> API sketch and 1-2 examples. > >> > >> I am wondering what sort of consistency guarantees do you imagine for such > >> operations, or why the fault tolerance is even relevant. Are you thinking > >> about an asynchronous API such as querying the state for another key might > >> give you a Future that is guaranteed to complete eventually. > >> > >> It seems to be hard to guarantee the timeliness (order) of these operations > >> with respect to the updates made to the states, so I wonder if there is > >> benefit of doing this compared to using the Queryable state interface. Is > >> this only a potential performance improvement or is it easier to work with > >> this? > >> > >> Cheers, > >> Gyula > >> > >> Gábor Hermann ezt írta (időpont: 2017. febr. 10., > >> P, 16:01): > >> > >>> Hi all, > >>> > >>> TL;DR: Is it worth to implement a special QueryableState for querying > >>> state from another part of a Flink streaming job and aligning it with > >>> fault tolerance? > >>> > >>> I've been thinking about implementing a Parameter Server with/within > >>> Flink. A Parameter Server is basically a specialized key-value store > >>> optimized for training distributed machine learning models. So not only > >>> the training data, but also the model is distributed. Range queries are > >>> typical, and usually vectors and matrices are stored as values. > >>> > >>> More generally, an integrated key-value store might also be useful in > >>> the Streaming API. Although external key-value stores can be used inside > >>> UDFs for the same purpose, aligning them with the fault tolerance > >>> mechanism of Flink could be hard. What if state distributed by a key (in > >>> the current Streaming API) could be queried from another operator? Much > >>> like QueryableState, but querying *inside* the Flink job. We could make > >>> use of the fact that state has been queried from inside to optimize > >>> communication and integrate fault tolerance. > >>> > >>> The question is whether the Flink community would like such feature, and > >>> if so how to do it? > >>> > >>> I could elaborate my ideas if needed, and I'm happy to create a design > >>> doc, but before that, I'd like to know what you all think about this. > >>> Also, I don't know if I'm missing something, so please correct me. Here > >>> are some quick notes regarding the integrated KV-store: > >>> > >>> Pros > >>> - It could allow easier implementation of more complicated use-cases. > >>> E.g. updating users preferences simultaneously based on each others > >>> preferences when events happen between them such as making a connection, > >>> liking each other posts, or going to the same concert. User preferences > >>> are distributed as a state, an event about user A liking user B gets > >>> sent to A's state and queries the state of B, then updates the state of > >>> B. There have been questions on the user mailing list for similar > >>> problems [1]. > >>> - Integration with fault tolerance. User does not have to maintain two > >>> systems consistently. > >>> - Optimization potentials. At the social network example maybe other > >>> users on the same partitions with user A need the state of user B, so we > >>> don't have to send around user B twice. > >>> - Could specialize to a Parameter Server for simple (and efficient) > >>> implementation of (possibly online) machine learning. E.g. sparse > >>> logistic regression, LDA, matrix factorization for recommendation systems. > >>> > >>> Cons > >>> - Lot of development effort. > >>> - "Client-server" architecture goes against the DAG dataflow model. > >>> > >>> Two approaches for the implementation in the streaming API: > >>> > >>> 1) An easy way to implement this is to use iterations (or the proposed > >>> loops API). We can achieve two-way communication by two operators in a > >>> loop: a worker (W) and a Parameter Server (PS), see the diagram [2]. (An > >>> additional nested loop in the PS could add replication opportunities). > >>> Then we would get fault tolerance "for free" by the work of Paris [3]. > >>> It would also be on top of the Streaming API, with no effect on the > >>> runtime. > >>> > >>> 2) A problem with the loop approach is that coordination between PS > >>> nodes and worker nodes can only be done on the data stream. We could not > >>> really use e.g. Akka for async coordination. A harder but more flexible > >>> way is to use lower-level interfaces of Flink and touch the runtime. > >>> Then we would have to take care of fault tolerance too. > >>> > >>> (As a side note: in the batch API generalizing delta iterations could be > >>> a solution for Parameter Server [4].) > >>> > >>> Thanks for any feedback :) > >>> > >>> Cheers, > >>> Gabor > >>> > >>> [1] > >>> > >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sharded-state-2-step-operation-td8631.html > >>> > >>> [2] https://i.imgur.com/GsliUIh.png > >>> [3] https://github.com/apache/flink/pull/1668 > >>> [4] > >>> > >>> https://www.linkedin.com/pulse/stale-synchronous-parallelism-new-frontier-apache-flink-nam-luc-tran > >>> > >>> > >>> > > > >