[
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295244#comment-17295244
]
Chesnay Schepler commented on FLINK-14184:
------------------------------------------
[~stephenc] Have you investigated whether the class loader release hooks
introduced in FLINK-17554 could serve your use-case?
Specifically they allow you to perform cleanup actions when, _for a given task
manager_, the last task of job has finished. (i.e., it is not the last task of
the job as a whole, just the last one still running on this particular TM)
You'd still need to setup all of your functions to register the hook though.
> Provide a stage listener API to be invoked per task manager
> -----------------------------------------------------------
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: Stephen Connolly
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the
> `open(...)` and `close()` callbacks, as the stage is accepted for execution
> on each task manager.
> There is, however, a need for being able to listen for the first stage being
> opened on any given task manager as well as the last stage being closed.
> Critically the last stage being closed is the opportunity to release any
> resources that are shared across multiple stages in the topology, e.g.
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots
> that prevent the topology's classloader from being unloaded and result in a
> memory and resource leak in the task manager... nevermind that if it is a
> Database connection pool, it may also be consuming resources from the
> database.
> There are three workarounds available at present:
> # Each stage just allocates its own resources and cleans up afterwards. This
> is, in many ways, the ideal... however this can result in higher than
> intended database connections, e.f. as each stage that accesses the database
> stage needs to have a separate database connection rather than letting the
> whole topology share the use of one or two connections through a connection
> pool. Similarly, if the 3rd party library uses a static singleton for the
> whole classloader there is no way for the independent stages to know when it
> is safe to shut down the singleton
> # Implement a reference counting proxy for the 3rd party API. This is a lot
> of work, you need to ensure that deserialization of the proxy returns a
> classloader singleton (so you can maintain the reference counts) and if the
> count goes wrong you have leaked the resource
> # Use a ReferenceQueue backed proxy. This is even more complex than
> implementing reference counting, but has the advantage of not requiring the
> count be maintained correctly. On the other hand, it does not provide for
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the
> execution environment then this would allow the resources to be cleared out.
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
> /**
> * Called immediately prior to the first {@link
> RichFunction#open(Configuration)}
> * being invoked for the topology on the current task manager JVM for this
> * classloader. Will not be called again unless {#close()} has been invoked
> first.
> * Use this method to eagerly initialize any ClassLoader scoped resources
> that
> * are pooled across the stages of the topology.
> *
> * @param parameters // I am unsure if this makes sense
> */
> default void open(Configuration parameters) throws Exception {}
> /**
> * Called after the last {@link RichFunction#close()} has completed and the
> * topology is effectively being stopped (for the current ClassLoader).
> * This method will only be invoked if a call to {@link
> #open(Configuration)}
> * was attempted, and will be invoked irrespective of whether the call to
> * {@link #open(Configuration)} terminated normally or exceptionally.
> * Use this method to release any ClassLoader scoped resources that have
> been
> * pooled across the stages of the topology.
> */
> default void close() throws Exception {}
> /**
> * Decorate the threads that are used to invoke the stages of the topology.
> * Use this method, for example, to seed the {@link org.slf4j.MDC} with
> * topology specific details, e.g.
> * <pre>
> * Runnable decorate(Runnable task) {
> * return () -> {
> * try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
> * task.run();
> * };
> * }
> * </pre>
> *
> * @param task // might not be the most appropriate type, I haven't
> * // checked how Flink implements dispatch. May or may not
> * // want a parameters argument also.
> */
> default Runnable decorate(Runnable task) { return task; }
> }{code}
> (Names subject to change)
> Then you would use this something like
> {code:java}
> env.addEnvironmentLocalTopologyListener(...);
> ...
> env.execute(...); {code}
> The listener would be serialized to each task manager and then before the
> first task is executed the `open(...)` method would get invoked. Each thread
> that is running a task would be decorated by the listener, and then once all
> the stages are stopped the `close()` method would be invoked.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)