I've updated the FLIP and excluded the blob storage from the initial scope.
I hope that I could answer all your questions. I will now start a vote. If
there are still things that we want to discuss, then please post to this
thread.

Cheers,
Till

On Thu, Dec 16, 2021 at 12:37 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Maybe it is a good idea to remove storing the blobs in the working
> directory from this FLIP and to address the problems with doing this as a
> follow up. This will keep the FLIP narrowly scoped and faster to implement.
> I will update the FLIP and move the blob storage part to the follow-up
> section.
>
> Cheers,
> Till
>
> On Thu, Dec 16, 2021 at 10:59 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi David,
>>
>> I think such an utility can be helpful. I would suggest adding something
>> like this once it is needed by a component.
>>
>> Currently, I think only the BlobServer might be susceptible to this
>> problem because we don't fsync the written bytes and then don't use an
>> atomic rename operation. If we change this, then I think we should not be
>> affected by this problem. For the BlobStore we have some detection
>> mechanism in place that ensures that you download the correct blob using a
>> MessageDigest. For the BlobCache we probably should add a check that the
>> locally stored file has the same MessageDigest as expected and if not, then
>> delete the file and refetch it from the BlobServer/BlobStore.
>>
>> The RocksDB working directory will be cleaned up with every process
>> restart and the local state directory is not used across process restarts
>> at the moment.
>>
>> Cheers,
>> Till
>>
>> On Thu, Dec 16, 2021 at 9:13 AM David Morávek <d...@apache.org> wrote:
>>
>>> Hi Till,
>>>
>>> thanks for drafting this FLIP, I think it's really a valuable
>>> improvement.
>>>
>>> Agreed with Yang, that YARN / k8s implementation should be out of scope
>>> of
>>> this FLIP. Just few notes on the possible integrations:
>>>
>>> For k8s, I think we can also benefit from this FLIP without StatefulSet.
>>> If
>>> the pod crashes for some reason, it will be restarted -> it's still on
>>> the
>>> same node, but it looses the state. This could be addressed by attaching
>>> an
>>> ephemeral volume to the container [1]. This is somewhere it between the
>>> current state & the persistent volume (this is where you need a
>>> StatefulSet) approach, that could be expensive (depends on the
>>> infrastructure).
>>>
>>> Example:
>>>
>>> apiVersion: v1
>>> kind: Pod
>>> metadata:
>>>   name: test-pod
>>> spec:
>>>   containers:
>>>   - image: ...
>>>     name: test-container
>>>     volumeMounts:
>>>     - mountPath: /cache
>>>       name: cache-volume
>>>   volumes:
>>>   - name: cache-volume
>>>     emptyDir: {}
>>>
>>> For YARN, I don't think it's as simple as remembering prior locations. As
>>> far as I remember the "restart from failure" results in a new container
>>> being created and the storage is tied with a container's lifecycle and
>>> the
>>> working directories are garbage collected right after the container
>>> FAILS /
>>> FINISHES. We'd most likely have to leverage a new component (something
>>> along the lines of how the shuffle services for YARN work), that runs
>>> embedded in NodeManager and allows you to externalize files for
>>> out-of-the-container-lifecycle use, and that ties their lifecycle with
>>> the
>>> job.
>>>
>>> As for the Chesnay's concern around corrupted files, are we sure that all
>>> components can recover from a corrupted file? Could we for example have a
>>> generic mechanism, that is reused by all the components writing to the
>>> working directory (CRC + File)?
>>>
>>> Other than that, I really like the FLIP and looking forward to have this
>>> feature in Flink +1.
>>>
>>> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/
>>>
>>> Best,
>>> D.
>>>
>>> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>>
>>> > I am afraid creating a dedicated StatefulSet for each TaskManager is
>>> too
>>> > expensive and using a shared StatefulSet for all
>>> > the TaskManagers is not flexible enough. Maybe setting a proper restart
>>> > policy for TaskManager pods could benefit from
>>> > this FLIP. But we might need to tackle some other issues, e.g.
>>> duplicated
>>> > registration, etc.
>>> >
>>> > All in all, this is out of the scope of this FLIP. I agree we could
>>> leave
>>> > it in the future FLIPs.
>>> >
>>> > I have no more concerns. +1
>>> >
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道:
>>> >
>>> > > This is true. But this is not a new problem and I think that Flink
>>> should
>>> > > be susceptible to this problem already. One solution for this
>>> concrete
>>> > case
>>> > > could be that the BlobServer stores some checksums and validates the
>>> file
>>> > > before serving it to the TM.
>>> > >
>>> > > Cheers,
>>> > > Till
>>> > >
>>> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <
>>> ches...@apache.org>
>>> > > wrote:
>>> > >
>>> > > > The issue with corrupted files is that some of them aren't read by
>>> the
>>> > > > component that stores them.
>>> > > > For example, a file can be corrupted in the blob server of the JM,
>>> but
>>> > > > that it is corrupted will only be noticed by the TaskExecutor.
>>> > > >
>>> > > > On 15/12/2021 11:36, Till Rohrmann wrote:
>>> > > > > Thanks everyone for your feedback. Let me try to address it by
>>> > grouping
>>> > > > > some of the individual comments:
>>> > > > >
>>> > > > > ### Will this feature work for native Yarn and K8s deployments?
>>> > > > >
>>> > > > > The working directory is an optional feature that can be used to
>>> > > recover
>>> > > > > additional information. You can think of it like a cache. If the
>>> > > working
>>> > > > > directory is there, then Flink can do certain things a bit
>>> faster but
>>> > > in
>>> > > > > the worst case it will have to retrieve the required information
>>> from
>>> > > the
>>> > > > > JobManager or persistent storage.
>>> > > > >
>>> > > > > In order to make it work with native Yarn and K8s, we would have
>>> to
>>> > > > change
>>> > > > > these modes slightly. First of all, we would have to be able to
>>> map
>>> > > > working
>>> > > > > directories to processes and then set a deterministic resource
>>> ids
>>> > for
>>> > > > the
>>> > > > > processes. For K8s this could be easily achievable by using a
>>> > > StatefulSet
>>> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we
>>> probably
>>> > > > would
>>> > > > > have to remember the prior locations of a process. Both things
>>> are
>>> > > > > potential follow ups that I don't want to tackle in this FLIP.
>>> > > > >
>>> > > > > If one of the modes configures the working directory to be on a
>>> full
>>> > or
>>> > > > > broken disk, then the process will fail. I think this is not all
>>> that
>>> > > > > different from the current state where some things in Flink will
>>> fail
>>> > > if
>>> > > > > they picked the wrong/full temporary directory (e.g. blob storage
>>> > > > > directory).
>>> > > > >
>>> > > > > ### Cleanup
>>> > > > >
>>> > > > > The working directory will be cleaned up if the Flink process is
>>> > > > gracefully
>>> > > > > shut down. This means that the JobManager process will clean it
>>> up if
>>> > > it
>>> > > > > runs in application mode and the job is terminated. SIGTERM and
>>> > SIGKILL
>>> > > > > signals will be treated as an ungraceful shutdown and therefore
>>> they
>>> > > > won't
>>> > > > > clean up the working directory. This means that we probably also
>>> > need a
>>> > > > > graceful way for shutting TaskManager processes down in the
>>> future
>>> > > > because
>>> > > > > right now they are in most cases killed in order to shut them
>>> down.
>>> > If
>>> > > > the
>>> > > > > user uses the tmp directory, then any left-over working
>>> directories
>>> > > will
>>> > > > be
>>> > > > > cleaned up with the next system restart. This is somewhat
>>> similar to
>>> > > how
>>> > > > > RocksDB's working directory is currently cleaned up as well.
>>> > > > >
>>> > > > > ### Corrupted files
>>> > > > >
>>> > > > > The working directory itself won't give you any guarantees. It
>>> will
>>> > be
>>> > > > the
>>> > > > > responsibility of the component that uses the working directory
>>> to
>>> > make
>>> > > > > sure that it can deal with corrupted files. E.g. if the component
>>> > > cannot
>>> > > > > read the file, then it should delete it and fall back to the
>>> remote
>>> > > > > storage/ground truth to retrieve the required information.
>>> > > > >
>>> > > > > I hope this could answer your questions. Let me know if you have
>>> more
>>> > > > > feedback.
>>> > > > >
>>> > > > > Cheers,
>>> > > > > Till
>>> > > > >
>>> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com>
>>> > wrote:
>>> > > > >
>>> > > > >> I like the idea. It can reuse the disk to do many things. Isn't
>>> it
>>> > > only
>>> > > > >> for inner failover? If not, the cleaning may be a problem. Also,
>>> > many
>>> > > > >> resource components have their own disk schedule strategy.
>>> > > > >>
>>> > > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道:
>>> > > > >>
>>> > > > >>> How do you intend to handle corrupted files, in particular due
>>> to
>>> > > > >>> process crashes during a write?
>>> > > > >>> Will all writes to a cached directory append some suffix (e.g.,
>>> > > > >>> ".pending") and do a rename?
>>> > > > >>>
>>> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
>>> > > > >>>> Hi everyone,
>>> > > > >>>>
>>> > > > >>>> I would like to start a discussion about introducing an
>>> explicit
>>> > > > working
>>> > > > >>>> directory for Flink processes that can be used to store
>>> > information
>>> > > > [1].
>>> > > > >>>> Per default this working directory will reside in the
>>> temporary
>>> > > > >>> directory
>>> > > > >>>> of the node Flink runs on. However, if configured to reside
>>> on a
>>> > > > >>> persistent
>>> > > > >>>> volume, then this information can be used to recover from
>>> > > process/node
>>> > > > >>>> failures. Moreover, such a working directory can be used to
>>> > > > consolidate
>>> > > > >>>> some of our other directories Flink creates under /tmp (e.g.
>>> > > > >>> blobStorage,
>>> > > > >>>> RocksDB working directory).
>>> > > > >>>>
>>> > > > >>>> Here is a draft PR that outlines the required changes [2].
>>> > > > >>>>
>>> > > > >>>> Looking forward to your feedback.
>>> > > > >>>>
>>> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
>>> > > > >>>> [2] https://github.com/apache/flink/pull/18083
>>> > > > >>>>
>>> > > > >>>> Cheers,
>>> > > > >>>> Till
>>> > > > >>>>
>>> > > > >>>
>>> > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to