zentol opened a new pull request #16345:
URL: https://github.com/apache/flink/pull/16345


   Based on #16339, just to get rid of a test instability that I ran into quite 
often.
   
   With this PR in a nutshell:
   - akka was bumped to 2.6
   - akka+scala+flink-rpc-akka are loaded through a separate classloader
   - scala suffix was removed from flink-runtime, cascading into various other 
modules
   
   The commit structure is _not_ what I intend to eventually be merged; I 
separate the changes for ease of review, the majority however would not succeed 
on CI on their own.
   For example, bumping Akka is not possible without the plugin work (because 
it needs scala 2.12), while the plugin work is not possible without bumping 
Akka (because it leaks the plugin classloader).
   
   
   I will now describe the changes in each commit:
   
   `Replace AskTimeException usages in tests`:
   This is a simple removal of some direct references to Akka.
   
   `Use reflection to access akka in MetricUtils`:
   Similar to above; ideally we'd come up with a TestingRpcSystem, but the 
dependencies are too extencies (RpcSystem needs a Builder, Builder needs an 
RpcService, RpcService needs even more stuff)
   
   `Add RpcSystem#cleanup`
   In order to work with the MiniCluster we need to be able to unload the 
AkkaRpcSystem when the MiniCluster is shutdown. This hook will eventuallly 
allow us to do that.
   
   `RpcSystem#load accepts Configuration`
   The configuration will primarily be necessary to retrieve the temp 
directory; more on that later.
   
   `Remove netty relocation`
   Now that we load netty through a separate class loader the relocation is 
technically not necessary. We may still want to keep it, because it is quite 
likely that some security scanner might now pick it up, and it is unlikely that 
we can migrate away from Netty for 1.14.
   
   `Remove scala suffixes`
   flink-runtime no longer depends on `${scala.binary.version}`. While it still 
transitively depends on scala via flink-rpc-akka, that module uses a fixed 
scala version so the suffix is not necessary.
   This also results in other modules lossing their scala suffix, like the 
reporter for example.
   
   `Exclude flink-rpc-akka from scala-suffix check`
   The suffix checker is pretty simple in that it mandates a scala suffix if 
scala shows up in the dependency tree. Since the scala version for 
flink-rpc-akka is fixed and independent of `${scala.binary.version}` I added 
crude exclusions for this module such that neither flink-rpc-akka nor modules 
depending on it require a suffix.
   
   `Bump akka to 2.6, bundle scala, and consolidate maven stuff in flink-rpc`
   This commits does a few things. It sets up flink-rpc-akka to assemble a fat 
jar, taking over bundling concerns from flink-dist.
   Everything related to akka dependencies (depMngt entries, properties) are 
now consolidated in flink-rpc-akka.
   The enforcer rule that ensures the same scala version is used for all 
projects is disabled for flink-rpc-akka, because it uses a separate scala 
version.
   
   Finally, Akka is bumped to 2.6.15, which did not require any code changes, 
but we had to modify many netty configurations settings to have a `classic` 
infix. In 2.6 Artery has become the default network layer, and we explicitly 
need to buy in to netty. Migrating to Artery is something we should look into 
in the near future.
   
   The primary reason for the bump is that in 2.5 Akka contained a custom 
common thread pool, that also contained a thread local, which meant that if any 
thread from outside Akka touches this executor a reference to this pool would 
be leaked to the outside. In 2.6 this pool was removed.
   
   `Load AkkaRpcSystem in separate classloader`
   The nasty bit.
   
   Primarily consists of 3 changes:
   
   1) flink-runtime has an optional runtime dependency on flink-rpc-akka
   
   - optional ensures that downstream modules don't see it
   - runtime scopes ensures that production code cannot reference 
flink-rpc-akka directly
   
   2) flink-rpc-akka jar is bundled by flink-runtime as is and extracted on the 
demand when the RpcSystem is being loaded
   The AkkaRpcSystem is not loaded like usual plugins; it does not use the 
PluginManager nor the plugins/ directory of the distribution. The reason for 
this is simple: It just does not work outside of production.
   In order for this feature to work we need to ensure dependency isolation in 
these cases:
   - production
   - tests without a MiniCluster in the IDE/maven (e.g., most of the stuff in 
flink-runtime)
   - tests with the MiniCLuster in the IDE/maven (most stuff outside of 
flink-runtime, currently the MiniCluster does not plugins)
   
   For the latter 2 we ideally want a solution where akka etc. are not on the 
compile/test classpath.
   Without a dependency being declared this means we require the flink-rpc-akka 
jar to be _somewhere_ on disk in order to point a ClassLoader to it.
   To solve this I concocted the following approach:
   a) flink-runtime bundles the flink-rpc-akka jar _as is_, that is the _jar_ 
is contained within the flink-runtime jar
   b) RpcSystem#load() extracts this jar from the flink-runtime jar, puts it 
into some temporary directory (IO_TMPDIR option), and points a PluginLoader at 
it
   
   This works in maven, the IDE and in production. Overall I think it is 
actually quite neat, but it has some downsides:
   a) A big gotcha that people need to be aware of is that if you change 
something in flink-rpc-akka, then you first need to rebuild flink-rpc-akka and 
flink-runtime for it to take effect.
   This is super unfortunate, but I just don't see a way to work around that.
   b) Having to extract the jar from the flink-runtime jar is a bit 
unfortunate; it would be neat if we could just point the ClassLoader to the 
file contained in flink-runtime. Which technically _should_ work, but the 
standard URLClassLoader just doesn't support it. It is possible though, as 
shown by the [One-JAR project](http://one-jar.sourceforge.net/).
   
   3) At various points the ContextClassLoader is set to either the Akka or 
Flink ClassLoader
   
   Akka itself sets the ContextClassLoader of all of it's threads to the one it 
itself was loaded with. As described in FLINK-23147 this can result in thread 
pools outside of the AkkaRpcSystem being poisoned such that some threads use a 
different classloader than others, resulting in classloading issues.
   To remedy this I covered most places where we transition from Akka to Flink 
with a TemporaryClassLoaderContext.
   There _are_ still other ocurrences, but from what I can tell they are not 
problematic at this time, and I already spend days on tracking and debugging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to