Maybe it is a good idea to remove storing the blobs in the working directory from this FLIP and to address the problems with doing this as a follow up. This will keep the FLIP narrowly scoped and faster to implement. I will update the FLIP and move the blob storage part to the follow-up section.
Cheers, Till On Thu, Dec 16, 2021 at 10:59 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi David, > > I think such an utility can be helpful. I would suggest adding something > like this once it is needed by a component. > > Currently, I think only the BlobServer might be susceptible to this > problem because we don't fsync the written bytes and then don't use an > atomic rename operation. If we change this, then I think we should not be > affected by this problem. For the BlobStore we have some detection > mechanism in place that ensures that you download the correct blob using a > MessageDigest. For the BlobCache we probably should add a check that the > locally stored file has the same MessageDigest as expected and if not, then > delete the file and refetch it from the BlobServer/BlobStore. > > The RocksDB working directory will be cleaned up with every process > restart and the local state directory is not used across process restarts > at the moment. > > Cheers, > Till > > On Thu, Dec 16, 2021 at 9:13 AM David Morávek <d...@apache.org> wrote: > >> Hi Till, >> >> thanks for drafting this FLIP, I think it's really a valuable improvement. >> >> Agreed with Yang, that YARN / k8s implementation should be out of scope of >> this FLIP. Just few notes on the possible integrations: >> >> For k8s, I think we can also benefit from this FLIP without StatefulSet. >> If >> the pod crashes for some reason, it will be restarted -> it's still on the >> same node, but it looses the state. This could be addressed by attaching >> an >> ephemeral volume to the container [1]. This is somewhere it between the >> current state & the persistent volume (this is where you need a >> StatefulSet) approach, that could be expensive (depends on the >> infrastructure). >> >> Example: >> >> apiVersion: v1 >> kind: Pod >> metadata: >> name: test-pod >> spec: >> containers: >> - image: ... >> name: test-container >> volumeMounts: >> - mountPath: /cache >> name: cache-volume >> volumes: >> - name: cache-volume >> emptyDir: {} >> >> For YARN, I don't think it's as simple as remembering prior locations. As >> far as I remember the "restart from failure" results in a new container >> being created and the storage is tied with a container's lifecycle and the >> working directories are garbage collected right after the container FAILS >> / >> FINISHES. We'd most likely have to leverage a new component (something >> along the lines of how the shuffle services for YARN work), that runs >> embedded in NodeManager and allows you to externalize files for >> out-of-the-container-lifecycle use, and that ties their lifecycle with the >> job. >> >> As for the Chesnay's concern around corrupted files, are we sure that all >> components can recover from a corrupted file? Could we for example have a >> generic mechanism, that is reused by all the components writing to the >> working directory (CRC + File)? >> >> Other than that, I really like the FLIP and looking forward to have this >> feature in Flink +1. >> >> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/ >> >> Best, >> D. >> >> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <danrtsey...@gmail.com> wrote: >> >> > I am afraid creating a dedicated StatefulSet for each TaskManager is too >> > expensive and using a shared StatefulSet for all >> > the TaskManagers is not flexible enough. Maybe setting a proper restart >> > policy for TaskManager pods could benefit from >> > this FLIP. But we might need to tackle some other issues, e.g. >> duplicated >> > registration, etc. >> > >> > All in all, this is out of the scope of this FLIP. I agree we could >> leave >> > it in the future FLIPs. >> > >> > I have no more concerns. +1 >> > >> > >> > Best, >> > Yang >> > >> > Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道: >> > >> > > This is true. But this is not a new problem and I think that Flink >> should >> > > be susceptible to this problem already. One solution for this concrete >> > case >> > > could be that the BlobServer stores some checksums and validates the >> file >> > > before serving it to the TM. >> > > >> > > Cheers, >> > > Till >> > > >> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ches...@apache.org >> > >> > > wrote: >> > > >> > > > The issue with corrupted files is that some of them aren't read by >> the >> > > > component that stores them. >> > > > For example, a file can be corrupted in the blob server of the JM, >> but >> > > > that it is corrupted will only be noticed by the TaskExecutor. >> > > > >> > > > On 15/12/2021 11:36, Till Rohrmann wrote: >> > > > > Thanks everyone for your feedback. Let me try to address it by >> > grouping >> > > > > some of the individual comments: >> > > > > >> > > > > ### Will this feature work for native Yarn and K8s deployments? >> > > > > >> > > > > The working directory is an optional feature that can be used to >> > > recover >> > > > > additional information. You can think of it like a cache. If the >> > > working >> > > > > directory is there, then Flink can do certain things a bit faster >> but >> > > in >> > > > > the worst case it will have to retrieve the required information >> from >> > > the >> > > > > JobManager or persistent storage. >> > > > > >> > > > > In order to make it work with native Yarn and K8s, we would have >> to >> > > > change >> > > > > these modes slightly. First of all, we would have to be able to >> map >> > > > working >> > > > > directories to processes and then set a deterministic resource ids >> > for >> > > > the >> > > > > processes. For K8s this could be easily achievable by using a >> > > StatefulSet >> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we >> probably >> > > > would >> > > > > have to remember the prior locations of a process. Both things are >> > > > > potential follow ups that I don't want to tackle in this FLIP. >> > > > > >> > > > > If one of the modes configures the working directory to be on a >> full >> > or >> > > > > broken disk, then the process will fail. I think this is not all >> that >> > > > > different from the current state where some things in Flink will >> fail >> > > if >> > > > > they picked the wrong/full temporary directory (e.g. blob storage >> > > > > directory). >> > > > > >> > > > > ### Cleanup >> > > > > >> > > > > The working directory will be cleaned up if the Flink process is >> > > > gracefully >> > > > > shut down. This means that the JobManager process will clean it >> up if >> > > it >> > > > > runs in application mode and the job is terminated. SIGTERM and >> > SIGKILL >> > > > > signals will be treated as an ungraceful shutdown and therefore >> they >> > > > won't >> > > > > clean up the working directory. This means that we probably also >> > need a >> > > > > graceful way for shutting TaskManager processes down in the future >> > > > because >> > > > > right now they are in most cases killed in order to shut them >> down. >> > If >> > > > the >> > > > > user uses the tmp directory, then any left-over working >> directories >> > > will >> > > > be >> > > > > cleaned up with the next system restart. This is somewhat similar >> to >> > > how >> > > > > RocksDB's working directory is currently cleaned up as well. >> > > > > >> > > > > ### Corrupted files >> > > > > >> > > > > The working directory itself won't give you any guarantees. It >> will >> > be >> > > > the >> > > > > responsibility of the component that uses the working directory to >> > make >> > > > > sure that it can deal with corrupted files. E.g. if the component >> > > cannot >> > > > > read the file, then it should delete it and fall back to the >> remote >> > > > > storage/ground truth to retrieve the required information. >> > > > > >> > > > > I hope this could answer your questions. Let me know if you have >> more >> > > > > feedback. >> > > > > >> > > > > Cheers, >> > > > > Till >> > > > > >> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com> >> > wrote: >> > > > > >> > > > >> I like the idea. It can reuse the disk to do many things. Isn't >> it >> > > only >> > > > >> for inner failover? If not, the cleaning may be a problem. Also, >> > many >> > > > >> resource components have their own disk schedule strategy. >> > > > >> >> > > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道: >> > > > >> >> > > > >>> How do you intend to handle corrupted files, in particular due >> to >> > > > >>> process crashes during a write? >> > > > >>> Will all writes to a cached directory append some suffix (e.g., >> > > > >>> ".pending") and do a rename? >> > > > >>> >> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote: >> > > > >>>> Hi everyone, >> > > > >>>> >> > > > >>>> I would like to start a discussion about introducing an >> explicit >> > > > working >> > > > >>>> directory for Flink processes that can be used to store >> > information >> > > > [1]. >> > > > >>>> Per default this working directory will reside in the temporary >> > > > >>> directory >> > > > >>>> of the node Flink runs on. However, if configured to reside on >> a >> > > > >>> persistent >> > > > >>>> volume, then this information can be used to recover from >> > > process/node >> > > > >>>> failures. Moreover, such a working directory can be used to >> > > > consolidate >> > > > >>>> some of our other directories Flink creates under /tmp (e.g. >> > > > >>> blobStorage, >> > > > >>>> RocksDB working directory). >> > > > >>>> >> > > > >>>> Here is a draft PR that outlines the required changes [2]. >> > > > >>>> >> > > > >>>> Looking forward to your feedback. >> > > > >>>> >> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw >> > > > >>>> [2] https://github.com/apache/flink/pull/18083 >> > > > >>>> >> > > > >>>> Cheers, >> > > > >>>> Till >> > > > >>>> >> > > > >>> >> > > > >> > > > >> > > >> > >> >