Thanks for your answers Zakelly. I get your points. > (...) it may not be suitable for scenarios where > - A state is read by condition. > - MapState with the user key cannot be determined in advance.
I guess it depends how we would like to treat the local disks. I've always thought about them that almost always eventually all state from the DFS should end up cached in the local disks. In this context, if you prefetch all of the state for a given key, even if it's not strictly speaking necessary right now, we know it will be accessed sooner or later, so it's ok to download it right away. Even if state doesn't fit into local disks, the more coarse grained solution shouldn't increase the latency by much. In the currently proposed more fine grained solution, you make a single request to DFS per each state access. In the proposal I mentioned, if we group all of the state fields for a given key together on the DFS, that would still be one single request to the DFS, albeit larger. But as your benchmarks showed it's the latency of a single request that matters, so if we are doing the request anyway, we can just as well fetch all of the state for that key. > according to our PoC tests[3], it is still beneficial to load state from local disk asynchronously (See line 4 of that table with 100% state in local cache). Optimization mainly comes from parallel I/O In that benchmark you mentioned, are you requesting the state asynchronously from local disks into memory? If the benefit comes from parallel I/O, then I would expect the benefit to disappear/shrink when running multiple subtasks on the same machine, as they would be making their own parallel requests, right? Also enabling checkpointing would further cut into the available I/O budget. Also please keep in mind I'm not arguing against the idea of async state access :) Just with what granularity those async requests should be made. Making state access asynchronous is definitely the right way to go! What also worries me a lot in this fine grained model is the effect on the checkpointing times. Many times we have noticed that basically every time we introduce a buffer (local SQL aggregations, flink-python, firing all timers at once, ...), which has to be processed before checkpoint completes, is always breaking checkpointing times under backpressure. Making the fine grained async mode compatible with unaligned checkpoints might be possible, but even if possible, it would be difficult. On the other hand, limiting the size of the in-flight state requests is difficult to get right. Value that works for one job, won't work properly for a different one. Or even a value for the same job will not work properly in the same job all the time. Firing timers, varying input records rate, watermarks hiccups, varying CPU load on the machine, ... might suddenly make the size of the buffer too large. In this regard the coarse grained approach would fair much better, as the size of the in-flight requests buffer wouldn't affect checkpointing times much - we would just need to checkpoint a bit more of the in-flight data, which has small, very predictable and stable impact on the checkpointing times. Also regarding the overheads, it would be great if you could provide profiling results of the benchmarks that you conducted to verify the results. Or maybe if you could describe the steps to reproduce the results? Especially "Hashmap (sync)" vs "Hashmap with async API". > However, IIUC, your proposal is valuable in that it is compatible with the original state APIs, and it can co-exist with the current plan. We do consider providing such a pre-fetch cache under the original state APIs and enhancing the performance transparently in future milestones. I think this probably could be made to work, with some extra complexity for the implementation, maintenance and would make it more complicated for users to configure Flink. It would be best to have a single option that either works the best or is the best compromise. I'm curious to read your thoughts on those aspects :) Best, Piotrek