Hi Till,

thanks for drafting this FLIP, I think it's really a valuable improvement.

Agreed with Yang, that YARN / k8s implementation should be out of scope of
this FLIP. Just few notes on the possible integrations:

For k8s, I think we can also benefit from this FLIP without StatefulSet. If
the pod crashes for some reason, it will be restarted -> it's still on the
same node, but it looses the state. This could be addressed by attaching an
ephemeral volume to the container [1]. This is somewhere it between the
current state & the persistent volume (this is where you need a
StatefulSet) approach, that could be expensive (depends on the
infrastructure).

Example:

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
spec:
  containers:
  - image: ...
    name: test-container
    volumeMounts:
    - mountPath: /cache
      name: cache-volume
  volumes:
  - name: cache-volume
    emptyDir: {}

For YARN, I don't think it's as simple as remembering prior locations. As
far as I remember the "restart from failure" results in a new container
being created and the storage is tied with a container's lifecycle and the
working directories are garbage collected right after the container FAILS /
FINISHES. We'd most likely have to leverage a new component (something
along the lines of how the shuffle services for YARN work), that runs
embedded in NodeManager and allows you to externalize files for
out-of-the-container-lifecycle use, and that ties their lifecycle with the
job.

As for the Chesnay's concern around corrupted files, are we sure that all
components can recover from a corrupted file? Could we for example have a
generic mechanism, that is reused by all the components writing to the
working directory (CRC + File)?

Other than that, I really like the FLIP and looking forward to have this
feature in Flink +1.

[1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/

Best,
D.

On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <danrtsey...@gmail.com> wrote:

> I am afraid creating a dedicated StatefulSet for each TaskManager is too
> expensive and using a shared StatefulSet for all
> the TaskManagers is not flexible enough. Maybe setting a proper restart
> policy for TaskManager pods could benefit from
> this FLIP. But we might need to tackle some other issues, e.g. duplicated
> registration, etc.
>
> All in all, this is out of the scope of this FLIP. I agree we could leave
> it in the future FLIPs.
>
> I have no more concerns. +1
>
>
> Best,
> Yang
>
> Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道:
>
> > This is true. But this is not a new problem and I think that Flink should
> > be susceptible to this problem already. One solution for this concrete
> case
> > could be that the BlobServer stores some checksums and validates the file
> > before serving it to the TM.
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ches...@apache.org>
> > wrote:
> >
> > > The issue with corrupted files is that some of them aren't read by the
> > > component that stores them.
> > > For example, a file can be corrupted in the blob server of the JM, but
> > > that it is corrupted will only be noticed by the TaskExecutor.
> > >
> > > On 15/12/2021 11:36, Till Rohrmann wrote:
> > > > Thanks everyone for your feedback. Let me try to address it by
> grouping
> > > > some of the individual comments:
> > > >
> > > > ### Will this feature work for native Yarn and K8s deployments?
> > > >
> > > > The working directory is an optional feature that can be used to
> > recover
> > > > additional information. You can think of it like a cache. If the
> > working
> > > > directory is there, then Flink can do certain things a bit faster but
> > in
> > > > the worst case it will have to retrieve the required information from
> > the
> > > > JobManager or persistent storage.
> > > >
> > > > In order to make it work with native Yarn and K8s, we would have to
> > > change
> > > > these modes slightly. First of all, we would have to be able to map
> > > working
> > > > directories to processes and then set a deterministic resource ids
> for
> > > the
> > > > processes. For K8s this could be easily achievable by using a
> > StatefulSet
> > > > as the deployment mechanism for TaskExecutors. For Yarn, we probably
> > > would
> > > > have to remember the prior locations of a process. Both things are
> > > > potential follow ups that I don't want to tackle in this FLIP.
> > > >
> > > > If one of the modes configures the working directory to be on a full
> or
> > > > broken disk, then the process will fail. I think this is not all that
> > > > different from the current state where some things in Flink will fail
> > if
> > > > they picked the wrong/full temporary directory (e.g. blob storage
> > > > directory).
> > > >
> > > > ### Cleanup
> > > >
> > > > The working directory will be cleaned up if the Flink process is
> > > gracefully
> > > > shut down. This means that the JobManager process will clean it up if
> > it
> > > > runs in application mode and the job is terminated. SIGTERM and
> SIGKILL
> > > > signals will be treated as an ungraceful shutdown and therefore they
> > > won't
> > > > clean up the working directory. This means that we probably also
> need a
> > > > graceful way for shutting TaskManager processes down in the future
> > > because
> > > > right now they are in most cases killed in order to shut them down.
> If
> > > the
> > > > user uses the tmp directory, then any left-over working directories
> > will
> > > be
> > > > cleaned up with the next system restart. This is somewhat similar to
> > how
> > > > RocksDB's working directory is currently cleaned up as well.
> > > >
> > > > ### Corrupted files
> > > >
> > > > The working directory itself won't give you any guarantees. It will
> be
> > > the
> > > > responsibility of the component that uses the working directory to
> make
> > > > sure that it can deal with corrupted files. E.g. if the component
> > cannot
> > > > read the file, then it should delete it and fall back to the remote
> > > > storage/ground truth to retrieve the required information.
> > > >
> > > > I hope this could answer your questions. Let me know if you have more
> > > > feedback.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com>
> wrote:
> > > >
> > > >> I like the idea. It can reuse the disk to do many things. Isn't it
> > only
> > > >> for inner failover? If not, the cleaning may be a problem. Also,
> many
> > > >> resource components have their own disk schedule strategy.
> > > >>
> > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道:
> > > >>
> > > >>> How do you intend to handle corrupted files, in particular due to
> > > >>> process crashes during a write?
> > > >>> Will all writes to a cached directory append some suffix (e.g.,
> > > >>> ".pending") and do a rename?
> > > >>>
> > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
> > > >>>> Hi everyone,
> > > >>>>
> > > >>>> I would like to start a discussion about introducing an explicit
> > > working
> > > >>>> directory for Flink processes that can be used to store
> information
> > > [1].
> > > >>>> Per default this working directory will reside in the temporary
> > > >>> directory
> > > >>>> of the node Flink runs on. However, if configured to reside on a
> > > >>> persistent
> > > >>>> volume, then this information can be used to recover from
> > process/node
> > > >>>> failures. Moreover, such a working directory can be used to
> > > consolidate
> > > >>>> some of our other directories Flink creates under /tmp (e.g.
> > > >>> blobStorage,
> > > >>>> RocksDB working directory).
> > > >>>>
> > > >>>> Here is a draft PR that outlines the required changes [2].
> > > >>>>
> > > >>>> Looking forward to your feedback.
> > > >>>>
> > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> > > >>>> [2] https://github.com/apache/flink/pull/18083
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Till
> > > >>>>
> > > >>>
> > >
> > >
> >
>

Reply via email to