tillrohrmann commented on a change in pull request #16345:
URL: https://github.com/apache/flink/pull/16345#discussion_r663134182
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##########
@@ -250,7 +252,10 @@ public void close() throws Exception {
FutureUtils.composeAfterwards(
taskManagerTerminationFuture,
this::shutDownServices);
- serviceTerminationFuture.whenComplete(
+ final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
+ FutureUtils.runAfterwards(serviceTerminationFuture,
rpcSystem::cleanup);
+
+ rpcSystemClassLoaderCloseFuture.whenComplete(
Review comment:
Could use `FutureUtils.forward`.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##########
@@ -51,6 +51,9 @@ RpcServiceBuilder remoteServiceBuilder(
@Nullable String externalAddress,
String externalPortRange);
+ /** Hook to cleanup resources, like common thread pools or classloaders. */
+ void cleanup();
Review comment:
But I am also for `AutoCloseable`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -498,8 +499,12 @@ private Configuration
generateClusterConfiguration(Configuration configuration)
FutureUtils.composeAfterwards(
shutDownApplicationFuture, () ->
stopClusterServices(cleanupHaData));
+ final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
+ FutureUtils.runAfterwards(serviceShutdownFuture, () ->
rpcSystem.cleanup());
Review comment:
`rpcSystem::cleanup`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
##########
@@ -83,16 +81,24 @@ public void
testStartMetricActorSystemRespectsThreadPriority() throws Exception
final RpcService rpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration, "localhost", RpcSystem.load());
- assertThat(rpcService, instanceOf(AkkaRpcService.class));
-
- final ActorSystem actorSystem = ((AkkaRpcService)
rpcService).getActorSystem();
try {
+ // dirty reflection code to avoid ClassCastExceptions
+ final Method getActorSystem =
rpcService.getClass().getMethod("getActorSystem");
+ final Object actorSystem = getActorSystem.invoke(rpcService);
+
+ final Method settingsMethod =
actorSystem.getClass().getMethod("settings");
+ final Object settings = settingsMethod.invoke(actorSystem);
+
+ final Method configMethod =
settings.getClass().getMethod("config");
+ final Object config = configMethod.invoke(settings);
+
+ final Method getIntMethod = config.getClass().getMethod("getInt",
String.class);
+ getIntMethod.setAccessible(true);
final int threadPriority =
- actorSystem
- .settings()
- .config()
-
.getInt("akka.actor.default-dispatcher.thread-priority");
+ (int)
+ getIntMethod.invoke(
+ config,
"akka.actor.default-dispatcher.thread-priority");
Review comment:
Could we either add a method
`AkkaRpcService.getAkkaConfigValue("akka.actor.default-dispatcher.thread-priority")`
and the only reflect this single method or can we actually start an endpoint
in the `rpcService` and send a rpc message to query
`Thread.currentThread().getPriority()`? That way we actually test that we set
the correct priority.
##########
File path: flink-rpc/flink-rpc-akka/pom.xml
##########
@@ -53,102 +59,89 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
-
<artifactId>akka-actor_${scala.binary.version}</artifactId>
- <!-- exclusions for dependency conversion -->
- <exclusions>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- </exclusions>
+
<artifactId>akka-actor_${akka.scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
</dependency>
-
<dependency>
<groupId>com.typesafe.akka</groupId>
-
<artifactId>akka-remote_${scala.binary.version}</artifactId>
- <!-- exclusions for dependency conversion -->
+
<artifactId>akka-remote_${akka.scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
<exclusions>
<exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
+ <groupId>io.aeron</groupId>
+ <artifactId>aeron-driver</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.aeron</groupId>
+ <artifactId>aeron-client</artifactId>
</exclusion>
Review comment:
Why can we exclude these dependencies?
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##########
@@ -77,6 +77,15 @@ RpcServiceBuilder withExecutorConfiguration(
* @return loaded RpcSystem
*/
static RpcSystem load() {
+ return load(new Configuration());
+ }
+
+ /**
+ * Loads the RpcSystem.
+ *
+ * @return loaded RpcSystem
Review comment:
JavaDoc for `config` is missing.
##########
File path:
tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
##########
@@ -93,6 +93,11 @@ private static ParseResult parseMavenOutput(final Path path)
throws IOException
final Matcher matcher = moduleNamePattern.matcher(line);
if (matcher.matches()) {
final String moduleName =
stripScalaSuffix(matcher.group(1));
+ // we ignored flink-rpc-akka because it is loaded through
a separate class
+ // loader
+ if (moduleName.equals("flink-rpc-akka")) {
Review comment:
nit: Shall we deduplicate this string?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]