Hi Till, thanks for drafting this FLIP, I think it's really a valuable improvement.
Agreed with Yang, that YARN / k8s implementation should be out of scope of this FLIP. Just few notes on the possible integrations: For k8s, I think we can also benefit from this FLIP without StatefulSet. If the pod crashes for some reason, it will be restarted -> it's still on the same node, but it looses the state. This could be addressed by attaching an ephemeral volume to the container [1]. This is somewhere it between the current state & the persistent volume (this is where you need a StatefulSet) approach, that could be expensive (depends on the infrastructure). Example: apiVersion: v1 kind: Pod metadata: name: test-pod spec: containers: - image: ... name: test-container volumeMounts: - mountPath: /cache name: cache-volume volumes: - name: cache-volume emptyDir: {} For YARN, I don't think it's as simple as remembering prior locations. As far as I remember the "restart from failure" results in a new container being created and the storage is tied with a container's lifecycle and the working directories are garbage collected right after the container FAILS / FINISHES. We'd most likely have to leverage a new component (something along the lines of how the shuffle services for YARN work), that runs embedded in NodeManager and allows you to externalize files for out-of-the-container-lifecycle use, and that ties their lifecycle with the job. As for the Chesnay's concern around corrupted files, are we sure that all components can recover from a corrupted file? Could we for example have a generic mechanism, that is reused by all the components writing to the working directory (CRC + File)? Other than that, I really like the FLIP and looking forward to have this feature in Flink +1. [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/ Best, D. On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <danrtsey...@gmail.com> wrote: > I am afraid creating a dedicated StatefulSet for each TaskManager is too > expensive and using a shared StatefulSet for all > the TaskManagers is not flexible enough. Maybe setting a proper restart > policy for TaskManager pods could benefit from > this FLIP. But we might need to tackle some other issues, e.g. duplicated > registration, etc. > > All in all, this is out of the scope of this FLIP. I agree we could leave > it in the future FLIPs. > > I have no more concerns. +1 > > > Best, > Yang > > Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道: > > > This is true. But this is not a new problem and I think that Flink should > > be susceptible to this problem already. One solution for this concrete > case > > could be that the BlobServer stores some checksums and validates the file > > before serving it to the TM. > > > > Cheers, > > Till > > > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ches...@apache.org> > > wrote: > > > > > The issue with corrupted files is that some of them aren't read by the > > > component that stores them. > > > For example, a file can be corrupted in the blob server of the JM, but > > > that it is corrupted will only be noticed by the TaskExecutor. > > > > > > On 15/12/2021 11:36, Till Rohrmann wrote: > > > > Thanks everyone for your feedback. Let me try to address it by > grouping > > > > some of the individual comments: > > > > > > > > ### Will this feature work for native Yarn and K8s deployments? > > > > > > > > The working directory is an optional feature that can be used to > > recover > > > > additional information. You can think of it like a cache. If the > > working > > > > directory is there, then Flink can do certain things a bit faster but > > in > > > > the worst case it will have to retrieve the required information from > > the > > > > JobManager or persistent storage. > > > > > > > > In order to make it work with native Yarn and K8s, we would have to > > > change > > > > these modes slightly. First of all, we would have to be able to map > > > working > > > > directories to processes and then set a deterministic resource ids > for > > > the > > > > processes. For K8s this could be easily achievable by using a > > StatefulSet > > > > as the deployment mechanism for TaskExecutors. For Yarn, we probably > > > would > > > > have to remember the prior locations of a process. Both things are > > > > potential follow ups that I don't want to tackle in this FLIP. > > > > > > > > If one of the modes configures the working directory to be on a full > or > > > > broken disk, then the process will fail. I think this is not all that > > > > different from the current state where some things in Flink will fail > > if > > > > they picked the wrong/full temporary directory (e.g. blob storage > > > > directory). > > > > > > > > ### Cleanup > > > > > > > > The working directory will be cleaned up if the Flink process is > > > gracefully > > > > shut down. This means that the JobManager process will clean it up if > > it > > > > runs in application mode and the job is terminated. SIGTERM and > SIGKILL > > > > signals will be treated as an ungraceful shutdown and therefore they > > > won't > > > > clean up the working directory. This means that we probably also > need a > > > > graceful way for shutting TaskManager processes down in the future > > > because > > > > right now they are in most cases killed in order to shut them down. > If > > > the > > > > user uses the tmp directory, then any left-over working directories > > will > > > be > > > > cleaned up with the next system restart. This is somewhat similar to > > how > > > > RocksDB's working directory is currently cleaned up as well. > > > > > > > > ### Corrupted files > > > > > > > > The working directory itself won't give you any guarantees. It will > be > > > the > > > > responsibility of the component that uses the working directory to > make > > > > sure that it can deal with corrupted files. E.g. if the component > > cannot > > > > read the file, then it should delete it and fall back to the remote > > > > storage/ground truth to retrieve the required information. > > > > > > > > I hope this could answer your questions. Let me know if you have more > > > > feedback. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com> > wrote: > > > > > > > >> I like the idea. It can reuse the disk to do many things. Isn't it > > only > > > >> for inner failover? If not, the cleaning may be a problem. Also, > many > > > >> resource components have their own disk schedule strategy. > > > >> > > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道: > > > >> > > > >>> How do you intend to handle corrupted files, in particular due to > > > >>> process crashes during a write? > > > >>> Will all writes to a cached directory append some suffix (e.g., > > > >>> ".pending") and do a rename? > > > >>> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote: > > > >>>> Hi everyone, > > > >>>> > > > >>>> I would like to start a discussion about introducing an explicit > > > working > > > >>>> directory for Flink processes that can be used to store > information > > > [1]. > > > >>>> Per default this working directory will reside in the temporary > > > >>> directory > > > >>>> of the node Flink runs on. However, if configured to reside on a > > > >>> persistent > > > >>>> volume, then this information can be used to recover from > > process/node > > > >>>> failures. Moreover, such a working directory can be used to > > > consolidate > > > >>>> some of our other directories Flink creates under /tmp (e.g. > > > >>> blobStorage, > > > >>>> RocksDB working directory). > > > >>>> > > > >>>> Here is a draft PR that outlines the required changes [2]. > > > >>>> > > > >>>> Looking forward to your feedback. > > > >>>> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw > > > >>>> [2] https://github.com/apache/flink/pull/18083 > > > >>>> > > > >>>> Cheers, > > > >>>> Till > > > >>>> > > > >>> > > > > > > > > >