I am afraid creating a dedicated StatefulSet for each TaskManager is too expensive and using a shared StatefulSet for all the TaskManagers is not flexible enough. Maybe setting a proper restart policy for TaskManager pods could benefit from this FLIP. But we might need to tackle some other issues, e.g. duplicated registration, etc.
All in all, this is out of the scope of this FLIP. I agree we could leave it in the future FLIPs. I have no more concerns. +1 Best, Yang Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道: > This is true. But this is not a new problem and I think that Flink should > be susceptible to this problem already. One solution for this concrete case > could be that the BlobServer stores some checksums and validates the file > before serving it to the TM. > > Cheers, > Till > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ches...@apache.org> > wrote: > > > The issue with corrupted files is that some of them aren't read by the > > component that stores them. > > For example, a file can be corrupted in the blob server of the JM, but > > that it is corrupted will only be noticed by the TaskExecutor. > > > > On 15/12/2021 11:36, Till Rohrmann wrote: > > > Thanks everyone for your feedback. Let me try to address it by grouping > > > some of the individual comments: > > > > > > ### Will this feature work for native Yarn and K8s deployments? > > > > > > The working directory is an optional feature that can be used to > recover > > > additional information. You can think of it like a cache. If the > working > > > directory is there, then Flink can do certain things a bit faster but > in > > > the worst case it will have to retrieve the required information from > the > > > JobManager or persistent storage. > > > > > > In order to make it work with native Yarn and K8s, we would have to > > change > > > these modes slightly. First of all, we would have to be able to map > > working > > > directories to processes and then set a deterministic resource ids for > > the > > > processes. For K8s this could be easily achievable by using a > StatefulSet > > > as the deployment mechanism for TaskExecutors. For Yarn, we probably > > would > > > have to remember the prior locations of a process. Both things are > > > potential follow ups that I don't want to tackle in this FLIP. > > > > > > If one of the modes configures the working directory to be on a full or > > > broken disk, then the process will fail. I think this is not all that > > > different from the current state where some things in Flink will fail > if > > > they picked the wrong/full temporary directory (e.g. blob storage > > > directory). > > > > > > ### Cleanup > > > > > > The working directory will be cleaned up if the Flink process is > > gracefully > > > shut down. This means that the JobManager process will clean it up if > it > > > runs in application mode and the job is terminated. SIGTERM and SIGKILL > > > signals will be treated as an ungraceful shutdown and therefore they > > won't > > > clean up the working directory. This means that we probably also need a > > > graceful way for shutting TaskManager processes down in the future > > because > > > right now they are in most cases killed in order to shut them down. If > > the > > > user uses the tmp directory, then any left-over working directories > will > > be > > > cleaned up with the next system restart. This is somewhat similar to > how > > > RocksDB's working directory is currently cleaned up as well. > > > > > > ### Corrupted files > > > > > > The working directory itself won't give you any guarantees. It will be > > the > > > responsibility of the component that uses the working directory to make > > > sure that it can deal with corrupted files. E.g. if the component > cannot > > > read the file, then it should delete it and fall back to the remote > > > storage/ground truth to retrieve the required information. > > > > > > I hope this could answer your questions. Let me know if you have more > > > feedback. > > > > > > Cheers, > > > Till > > > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com> wrote: > > > > > >> I like the idea. It can reuse the disk to do many things. Isn't it > only > > >> for inner failover? If not, the cleaning may be a problem. Also, many > > >> resource components have their own disk schedule strategy. > > >> > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道: > > >> > > >>> How do you intend to handle corrupted files, in particular due to > > >>> process crashes during a write? > > >>> Will all writes to a cached directory append some suffix (e.g., > > >>> ".pending") and do a rename? > > >>> > > >>> On 10/12/2021 17:54, Till Rohrmann wrote: > > >>>> Hi everyone, > > >>>> > > >>>> I would like to start a discussion about introducing an explicit > > working > > >>>> directory for Flink processes that can be used to store information > > [1]. > > >>>> Per default this working directory will reside in the temporary > > >>> directory > > >>>> of the node Flink runs on. However, if configured to reside on a > > >>> persistent > > >>>> volume, then this information can be used to recover from > process/node > > >>>> failures. Moreover, such a working directory can be used to > > consolidate > > >>>> some of our other directories Flink creates under /tmp (e.g. > > >>> blobStorage, > > >>>> RocksDB working directory). > > >>>> > > >>>> Here is a draft PR that outlines the required changes [2]. > > >>>> > > >>>> Looking forward to your feedback. > > >>>> > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw > > >>>> [2] https://github.com/apache/flink/pull/18083 > > >>>> > > >>>> Cheers, > > >>>> Till > > >>>> > > >>> > > > > >