[
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16957712#comment-16957712
]
Till Rohrmann commented on FLINK-14184:
---------------------------------------
Hi [~stephenc], thanks for opening this issue. I agree with you that having
more explicit lifecycle hooks allow better control over resources and their
usage. If I've understood you correctly, then you would like to have hooks
which are executed before any task of a given job is run on a {{TaskExecutor}}
and after the last task of the same job has completed. This is currently not
supported and would involve some changes.
Looking at your PR, it seems that your use case would also work if the hooks
would be called per {{Task}}. But this is exactly the contract of the
{{RichFunction}} with its {{#open}} and {{#close}} methods. Hence, it might
already be enough for your use case to use these functions given that the PR is
what is good enough for you.
> Provide a stage listener API to be invoked per task manager
> -----------------------------------------------------------
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination, Runtime / Task
> Reporter: Stephen Connolly
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the
> `open(...)` and `close()` callbacks, as the stage is accepted for execution
> on each task manager.
> There is, however, a need for being able to listen for the first stage being
> opened on any given task manager as well as the last stage being closed.
> Critically the last stage being closed is the opportunity to release any
> resources that are shared across multiple stages in the topology, e.g.
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots
> that prevent the topology's classloader from being unloaded and result in a
> memory and resource leak in the task manager... nevermind that if it is a
> Database connection pool, it may also be consuming resources from the
> database.
> There are three workarounds available at present:
> # Each stage just allocates its own resources and cleans up afterwards. This
> is, in many ways, the ideal... however this can result in higher than
> intended database connections, e.f. as each stage that accesses the database
> stage needs to have a separate database connection rather than letting the
> whole topology share the use of one or two connections through a connection
> pool. Similarly, if the 3rd party library uses a static singleton for the
> whole classloader there is no way for the independent stages to know when it
> is safe to shut down the singleton
> # Implement a reference counting proxy for the 3rd party API. This is a lot
> of work, you need to ensure that deserialization of the proxy returns a
> classloader singleton (so you can maintain the reference counts) and if the
> count goes wrong you have leaked the resource
> # Use a ReferenceQueue backed proxy. This is even more complex than
> implementing reference counting, but has the advantage of not requiring the
> count be maintained correctly. On the other hand, it does not provide for
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the
> execution environment then this would allow the resources to be cleared out.
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
> /**
> * Called immediately prior to the first {@link
> RichFunction#open(Configuration)}
> * being invoked for the topology on the current task manager JVM for this
> * classloader. Will not be called again unless {#close()} has been invoked
> first.
> * Use this method to eagerly initialize any ClassLoader scoped resources
> that
> * are pooled across the stages of the topology.
> *
> * @param parameters // I am unsure if this makes sense
> */
> default void open(Configuration parameters) throws Exception {}
> /**
> * Called after the last {@link RichFunction#close()} has completed and the
> * topology is effectively being stopped (for the current ClassLoader).
> * This method will only be invoked if a call to {@link
> #open(Configuration)}
> * was attempted, and will be invoked irrespective of whether the call to
> * {@link #open(Configuration)} terminated normally or exceptionally.
> * Use this method to release any ClassLoader scoped resources that have
> been
> * pooled across the stages of the topology.
> */
> default void close() throws Exception {}
> /**
> * Decorate the threads that are used to invoke the stages of the topology.
> * Use this method, for example, to seed the {@link org.slf4j.MDC} with
> * topology specific details, e.g.
> * <pre>
> * Runnable decorate(Runnable task) {
> * return () -> {
> * try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
> * task.run();
> * };
> * }
> * </pre>
> *
> * @param task // might not be the most appropriate type, I haven't
> * // checked how Flink implements dispatch. May or may not
> * // want a parameters argument also.
> */
> default Runnable decorate(Runnable task) { return task; }
> }{code}
> (Names subject to change)
> Then you would use this something like
> {code:java}
> env.addEnvironmentLocalTopologyListener(...);
> ...
> env.execute(...); {code}
> The listener would be serialized to each task manager and then before the
> first task is executed the `open(...)` method would get invoked. Each thread
> that is running a task would be decorated by the listener, and then once all
> the stages are stopped the `close()` method would be invoked.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)