I've updated the FLIP and excluded the blob storage from the initial scope. I hope that I could answer all your questions. I will now start a vote. If there are still things that we want to discuss, then please post to this thread.
Cheers, Till On Thu, Dec 16, 2021 at 12:37 PM Till Rohrmann <trohrm...@apache.org> wrote: > Maybe it is a good idea to remove storing the blobs in the working > directory from this FLIP and to address the problems with doing this as a > follow up. This will keep the FLIP narrowly scoped and faster to implement. > I will update the FLIP and move the blob storage part to the follow-up > section. > > Cheers, > Till > > On Thu, Dec 16, 2021 at 10:59 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi David, >> >> I think such an utility can be helpful. I would suggest adding something >> like this once it is needed by a component. >> >> Currently, I think only the BlobServer might be susceptible to this >> problem because we don't fsync the written bytes and then don't use an >> atomic rename operation. If we change this, then I think we should not be >> affected by this problem. For the BlobStore we have some detection >> mechanism in place that ensures that you download the correct blob using a >> MessageDigest. For the BlobCache we probably should add a check that the >> locally stored file has the same MessageDigest as expected and if not, then >> delete the file and refetch it from the BlobServer/BlobStore. >> >> The RocksDB working directory will be cleaned up with every process >> restart and the local state directory is not used across process restarts >> at the moment. >> >> Cheers, >> Till >> >> On Thu, Dec 16, 2021 at 9:13 AM David Morávek <d...@apache.org> wrote: >> >>> Hi Till, >>> >>> thanks for drafting this FLIP, I think it's really a valuable >>> improvement. >>> >>> Agreed with Yang, that YARN / k8s implementation should be out of scope >>> of >>> this FLIP. Just few notes on the possible integrations: >>> >>> For k8s, I think we can also benefit from this FLIP without StatefulSet. >>> If >>> the pod crashes for some reason, it will be restarted -> it's still on >>> the >>> same node, but it looses the state. This could be addressed by attaching >>> an >>> ephemeral volume to the container [1]. This is somewhere it between the >>> current state & the persistent volume (this is where you need a >>> StatefulSet) approach, that could be expensive (depends on the >>> infrastructure). >>> >>> Example: >>> >>> apiVersion: v1 >>> kind: Pod >>> metadata: >>> name: test-pod >>> spec: >>> containers: >>> - image: ... >>> name: test-container >>> volumeMounts: >>> - mountPath: /cache >>> name: cache-volume >>> volumes: >>> - name: cache-volume >>> emptyDir: {} >>> >>> For YARN, I don't think it's as simple as remembering prior locations. As >>> far as I remember the "restart from failure" results in a new container >>> being created and the storage is tied with a container's lifecycle and >>> the >>> working directories are garbage collected right after the container >>> FAILS / >>> FINISHES. We'd most likely have to leverage a new component (something >>> along the lines of how the shuffle services for YARN work), that runs >>> embedded in NodeManager and allows you to externalize files for >>> out-of-the-container-lifecycle use, and that ties their lifecycle with >>> the >>> job. >>> >>> As for the Chesnay's concern around corrupted files, are we sure that all >>> components can recover from a corrupted file? Could we for example have a >>> generic mechanism, that is reused by all the components writing to the >>> working directory (CRC + File)? >>> >>> Other than that, I really like the FLIP and looking forward to have this >>> feature in Flink +1. >>> >>> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/ >>> >>> Best, >>> D. >>> >>> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <danrtsey...@gmail.com> wrote: >>> >>> > I am afraid creating a dedicated StatefulSet for each TaskManager is >>> too >>> > expensive and using a shared StatefulSet for all >>> > the TaskManagers is not flexible enough. Maybe setting a proper restart >>> > policy for TaskManager pods could benefit from >>> > this FLIP. But we might need to tackle some other issues, e.g. >>> duplicated >>> > registration, etc. >>> > >>> > All in all, this is out of the scope of this FLIP. I agree we could >>> leave >>> > it in the future FLIPs. >>> > >>> > I have no more concerns. +1 >>> > >>> > >>> > Best, >>> > Yang >>> > >>> > Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道: >>> > >>> > > This is true. But this is not a new problem and I think that Flink >>> should >>> > > be susceptible to this problem already. One solution for this >>> concrete >>> > case >>> > > could be that the BlobServer stores some checksums and validates the >>> file >>> > > before serving it to the TM. >>> > > >>> > > Cheers, >>> > > Till >>> > > >>> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler < >>> ches...@apache.org> >>> > > wrote: >>> > > >>> > > > The issue with corrupted files is that some of them aren't read by >>> the >>> > > > component that stores them. >>> > > > For example, a file can be corrupted in the blob server of the JM, >>> but >>> > > > that it is corrupted will only be noticed by the TaskExecutor. >>> > > > >>> > > > On 15/12/2021 11:36, Till Rohrmann wrote: >>> > > > > Thanks everyone for your feedback. Let me try to address it by >>> > grouping >>> > > > > some of the individual comments: >>> > > > > >>> > > > > ### Will this feature work for native Yarn and K8s deployments? >>> > > > > >>> > > > > The working directory is an optional feature that can be used to >>> > > recover >>> > > > > additional information. You can think of it like a cache. If the >>> > > working >>> > > > > directory is there, then Flink can do certain things a bit >>> faster but >>> > > in >>> > > > > the worst case it will have to retrieve the required information >>> from >>> > > the >>> > > > > JobManager or persistent storage. >>> > > > > >>> > > > > In order to make it work with native Yarn and K8s, we would have >>> to >>> > > > change >>> > > > > these modes slightly. First of all, we would have to be able to >>> map >>> > > > working >>> > > > > directories to processes and then set a deterministic resource >>> ids >>> > for >>> > > > the >>> > > > > processes. For K8s this could be easily achievable by using a >>> > > StatefulSet >>> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we >>> probably >>> > > > would >>> > > > > have to remember the prior locations of a process. Both things >>> are >>> > > > > potential follow ups that I don't want to tackle in this FLIP. >>> > > > > >>> > > > > If one of the modes configures the working directory to be on a >>> full >>> > or >>> > > > > broken disk, then the process will fail. I think this is not all >>> that >>> > > > > different from the current state where some things in Flink will >>> fail >>> > > if >>> > > > > they picked the wrong/full temporary directory (e.g. blob storage >>> > > > > directory). >>> > > > > >>> > > > > ### Cleanup >>> > > > > >>> > > > > The working directory will be cleaned up if the Flink process is >>> > > > gracefully >>> > > > > shut down. This means that the JobManager process will clean it >>> up if >>> > > it >>> > > > > runs in application mode and the job is terminated. SIGTERM and >>> > SIGKILL >>> > > > > signals will be treated as an ungraceful shutdown and therefore >>> they >>> > > > won't >>> > > > > clean up the working directory. This means that we probably also >>> > need a >>> > > > > graceful way for shutting TaskManager processes down in the >>> future >>> > > > because >>> > > > > right now they are in most cases killed in order to shut them >>> down. >>> > If >>> > > > the >>> > > > > user uses the tmp directory, then any left-over working >>> directories >>> > > will >>> > > > be >>> > > > > cleaned up with the next system restart. This is somewhat >>> similar to >>> > > how >>> > > > > RocksDB's working directory is currently cleaned up as well. >>> > > > > >>> > > > > ### Corrupted files >>> > > > > >>> > > > > The working directory itself won't give you any guarantees. It >>> will >>> > be >>> > > > the >>> > > > > responsibility of the component that uses the working directory >>> to >>> > make >>> > > > > sure that it can deal with corrupted files. E.g. if the component >>> > > cannot >>> > > > > read the file, then it should delete it and fall back to the >>> remote >>> > > > > storage/ground truth to retrieve the required information. >>> > > > > >>> > > > > I hope this could answer your questions. Let me know if you have >>> more >>> > > > > feedback. >>> > > > > >>> > > > > Cheers, >>> > > > > Till >>> > > > > >>> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com> >>> > wrote: >>> > > > > >>> > > > >> I like the idea. It can reuse the disk to do many things. Isn't >>> it >>> > > only >>> > > > >> for inner failover? If not, the cleaning may be a problem. Also, >>> > many >>> > > > >> resource components have their own disk schedule strategy. >>> > > > >> >>> > > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道: >>> > > > >> >>> > > > >>> How do you intend to handle corrupted files, in particular due >>> to >>> > > > >>> process crashes during a write? >>> > > > >>> Will all writes to a cached directory append some suffix (e.g., >>> > > > >>> ".pending") and do a rename? >>> > > > >>> >>> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote: >>> > > > >>>> Hi everyone, >>> > > > >>>> >>> > > > >>>> I would like to start a discussion about introducing an >>> explicit >>> > > > working >>> > > > >>>> directory for Flink processes that can be used to store >>> > information >>> > > > [1]. >>> > > > >>>> Per default this working directory will reside in the >>> temporary >>> > > > >>> directory >>> > > > >>>> of the node Flink runs on. However, if configured to reside >>> on a >>> > > > >>> persistent >>> > > > >>>> volume, then this information can be used to recover from >>> > > process/node >>> > > > >>>> failures. Moreover, such a working directory can be used to >>> > > > consolidate >>> > > > >>>> some of our other directories Flink creates under /tmp (e.g. >>> > > > >>> blobStorage, >>> > > > >>>> RocksDB working directory). >>> > > > >>>> >>> > > > >>>> Here is a draft PR that outlines the required changes [2]. >>> > > > >>>> >>> > > > >>>> Looking forward to your feedback. >>> > > > >>>> >>> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw >>> > > > >>>> [2] https://github.com/apache/flink/pull/18083 >>> > > > >>>> >>> > > > >>>> Cheers, >>> > > > >>>> Till >>> > > > >>>> >>> > > > >>> >>> > > > >>> > > > >>> > > >>> > >>> >>