zentol opened a new pull request #16345:
URL: https://github.com/apache/flink/pull/16345
Based on #16339, just to get rid of a test instability that I ran into quite
often.
With this PR in a nutshell:
- akka was bumped to 2.6
- akka+scala+flink-rpc-akka are loaded through a separate classloader
- scala suffix was removed from flink-runtime, cascading into various other
modules
The commit structure is _not_ what I intend to eventually be merged; I
separate the changes for ease of review, the majority however would not succeed
on CI on their own.
For example, bumping Akka is not possible without the plugin work (because
it needs scala 2.12), while the plugin work is not possible without bumping
Akka (because it leaks the plugin classloader).
I will now describe the changes in each commit:
`Replace AskTimeException usages in tests`:
This is a simple removal of some direct references to Akka.
`Use reflection to access akka in MetricUtils`:
Similar to above; ideally we'd come up with a TestingRpcSystem, but the
dependencies are too extencies (RpcSystem needs a Builder, Builder needs an
RpcService, RpcService needs even more stuff)
`Add RpcSystem#cleanup`
In order to work with the MiniCluster we need to be able to unload the
AkkaRpcSystem when the MiniCluster is shutdown. This hook will eventuallly
allow us to do that.
`RpcSystem#load accepts Configuration`
The configuration will primarily be necessary to retrieve the temp
directory; more on that later.
`Remove netty relocation`
Now that we load netty through a separate class loader the relocation is
technically not necessary. We may still want to keep it, because it is quite
likely that some security scanner might now pick it up, and it is unlikely that
we can migrate away from Netty for 1.14.
`Remove scala suffixes`
flink-runtime no longer depends on `${scala.binary.version}`. While it still
transitively depends on scala via flink-rpc-akka, that module uses a fixed
scala version so the suffix is not necessary.
This also results in other modules lossing their scala suffix, like the
reporter for example.
`Exclude flink-rpc-akka from scala-suffix check`
The suffix checker is pretty simple in that it mandates a scala suffix if
scala shows up in the dependency tree. Since the scala version for
flink-rpc-akka is fixed and independent of `${scala.binary.version}` I added
crude exclusions for this module such that neither flink-rpc-akka nor modules
depending on it require a suffix.
`Bump akka to 2.6, bundle scala, and consolidate maven stuff in flink-rpc`
This commits does a few things. It sets up flink-rpc-akka to assemble a fat
jar, taking over bundling concerns from flink-dist.
Everything related to akka dependencies (depMngt entries, properties) are
now consolidated in flink-rpc-akka.
The enforcer rule that ensures the same scala version is used for all
projects is disabled for flink-rpc-akka, because it uses a separate scala
version.
Finally, Akka is bumped to 2.6.15, which did not require any code changes,
but we had to modify many netty configurations settings to have a `classic`
infix. In 2.6 Artery has become the default network layer, and we explicitly
need to buy in to netty. Migrating to Artery is something we should look into
in the near future.
The primary reason for the bump is that in 2.5 Akka contained a custom
common thread pool, that also contained a thread local, which meant that if any
thread from outside Akka touches this executor a reference to this pool would
be leaked to the outside. In 2.6 this pool was removed.
`Load AkkaRpcSystem in separate classloader`
The nasty bit.
Primarily consists of 3 changes:
1) flink-runtime has an optional runtime dependency on flink-rpc-akka
- optional ensures that downstream modules don't see it
- runtime scopes ensures that production code cannot reference
flink-rpc-akka directly
2) flink-rpc-akka jar is bundled by flink-runtime as is and extracted on the
demand when the RpcSystem is being loaded
The AkkaRpcSystem is not loaded like usual plugins; it does not use the
PluginManager nor the plugins/ directory of the distribution. The reason for
this is simple: It just does not work outside of production.
In order for this feature to work we need to ensure dependency isolation in
these cases:
- production
- tests without a MiniCluster in the IDE/maven (e.g., most of the stuff in
flink-runtime)
- tests with the MiniCLuster in the IDE/maven (most stuff outside of
flink-runtime, currently the MiniCluster does not plugins)
For the latter 2 we ideally want a solution where akka etc. are not on the
compile/test classpath.
Without a dependency being declared this means we require the flink-rpc-akka
jar to be _somewhere_ on disk in order to point a ClassLoader to it.
To solve this I concocted the following approach:
a) flink-runtime bundles the flink-rpc-akka jar _as is_, that is the _jar_
is contained within the flink-runtime jar
b) RpcSystem#load() extracts this jar from the flink-runtime jar, puts it
into some temporary directory (IO_TMPDIR option), and points a PluginLoader at
it
This works in maven, the IDE and in production. Overall I think it is
actually quite neat, but it has some downsides:
a) A big gotcha that people need to be aware of is that if you change
something in flink-rpc-akka, then you first need to rebuild flink-rpc-akka and
flink-runtime for it to take effect.
This is super unfortunate, but I just don't see a way to work around that.
b) Having to extract the jar from the flink-runtime jar is a bit
unfortunate; it would be neat if we could just point the ClassLoader to the
file contained in flink-runtime. Which technically _should_ work, but the
standard URLClassLoader just doesn't support it. It is possible though, as
shown by the [One-JAR project](http://one-jar.sourceforge.net/).
3) At various points the ContextClassLoader is set to either the Akka or
Flink ClassLoader
Akka itself sets the ContextClassLoader of all of it's threads to the one it
itself was loaded with. As described in FLINK-23147 this can result in thread
pools outside of the AkkaRpcSystem being poisoned such that some threads use a
different classloader than others, resulting in classloading issues.
To remedy this I covered most places where we transition from Akka to Flink
with a TemporaryClassLoaderContext.
There _are_ still other ocurrences, but from what I can tell they are not
problematic at this time, and I already spend days on tracking and debugging.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]