Thanks for your answers Zakelly. I get your points.

>  (...) it may not be suitable for scenarios where
> - A state is read by condition.
> - MapState with the user key cannot be determined in advance.

I guess it depends how we would like to treat the local disks. I've always
thought about them that almost always eventually all state from the DFS
should end up cached in the local disks. In this context, if you prefetch
all of the state for a given key, even if it's not strictly speaking
necessary right now, we know it will be accessed sooner or later, so it's
ok to download it right away. Even if state doesn't fit into local disks,
the more coarse grained solution shouldn't increase the latency by much. In
the currently proposed more fine grained solution, you make a single
request to DFS per each state access. In the proposal I mentioned, if we
group all of the state fields for a given key together on the DFS, that
would still be one single request to the DFS, albeit larger. But as your
benchmarks showed it's the latency of a single request that matters, so if
we are doing the request anyway, we can just as well fetch all of the state
for that key.

 > according to our PoC tests[3], it is still beneficial to load state from
local disk asynchronously (See line 4 of that table with 100% state in
local cache). Optimization mainly comes from parallel I/O

In that benchmark you mentioned, are you requesting the state
asynchronously from local disks into memory? If the benefit comes from
parallel I/O, then I would expect the benefit to disappear/shrink when
running multiple subtasks on the same machine, as they would be making
their own parallel requests, right? Also enabling checkpointing would
further cut into the available I/O budget.

Also please keep in mind I'm not arguing against the idea of async state
access :) Just with what granularity those async requests should be made.
Making state access asynchronous is definitely the right way to go!

What also worries me a lot in this fine grained model is the effect on the
checkpointing times. Many times we have noticed that basically every time
we introduce a buffer (local SQL aggregations, flink-python, firing all
timers at once, ...), which has to be processed before checkpoint
completes, is always breaking checkpointing times under backpressure.
Making the fine grained async mode compatible with unaligned checkpoints
might be possible, but even if possible, it would be difficult. On the
other hand, limiting the size of the in-flight state requests is difficult
to get right. Value that works for one job, won't work properly for a
different one. Or even a value for the same job will not work properly in
the same job all the time. Firing timers, varying input records rate,
watermarks hiccups, varying CPU load on the machine, ... might suddenly
make the size of the buffer too large.  In this regard the coarse grained
approach would fair much better, as the size of the in-flight requests
buffer wouldn't affect checkpointing times much - we would just need to
checkpoint a bit more of the in-flight data, which has small, very
predictable and stable impact on the checkpointing times.

Also regarding the overheads, it would be great if you could provide
profiling results of the benchmarks that you conducted to verify the
results. Or maybe if you could describe the steps to reproduce the results?
Especially "Hashmap (sync)" vs "Hashmap with async API".

> However, IIUC, your proposal is valuable in that it is compatible with
the original state APIs, and it can co-exist with the current plan. We do
consider providing such a pre-fetch cache under the original state APIs and
enhancing the performance transparently in future milestones.

I think this probably could be made to work, with some extra complexity for
the implementation, maintenance and would make it more complicated for
users to configure Flink. It would be best to have a single option that
either works the best or is the best compromise.

I'm curious to read your thoughts on those aspects :)

Best,
Piotrek

Reply via email to