This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 64abd566e863e584a3d86eba17aee39d9bede4cd Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Sep 24 09:38:30 2021 +0200 [FLINK-24367][tests] Add FallbackAkkaRpcSystemLoader The added loader constructs the classloader based on the target/classes directory of flink-rpc-akka, after downloading required dependencies via maven. --- .../runtime/rpc/akka/AkkaRpcSystemLoader.java | 3 +- .../runtime/rpc/akka/CleanupOnCloseRpcSystem.java | 18 +-- .../rpc/akka/FallbackAkkaRpcSystemLoader.java | 139 +++++++++++++++++++++ .../org.apache.flink.runtime.rpc.RpcSystemLoader | 16 +++ flink-runtime/pom.xml | 9 ++ flink-test-utils-parent/flink-test-utils/pom.xml | 8 ++ 6 files changed, 184 insertions(+), 9 deletions(-) diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java index da0477b..ed8977a 100644 --- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java +++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java @@ -58,7 +58,8 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader { if (resourceStream == null) { throw new RuntimeException( "Akka RPC system could not be found. If this happened while running a test in the IDE," - + "run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line."); + + "run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line," + + "or add a test dependency on the flink-rpc-akka-loader test-jar."); } IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile)); diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java index 621047b..e45f4b5 100644 --- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java +++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.classloading.SubmoduleClassLoader; import org.apache.flink.runtime.rpc.AddressResolution; import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -30,7 +31,6 @@ import javax.annotation.Nullable; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.nio.file.Files; import java.nio.file.Path; /** An {@link RpcSystem} wrapper that cleans up resources after the RPC system has been closed. */ @@ -39,13 +39,13 @@ class CleanupOnCloseRpcSystem implements RpcSystem { private final RpcSystem rpcSystem; private final SubmoduleClassLoader pluginLoader; - private final Path tempFile; + @Nullable private final Path tempDirectory; public CleanupOnCloseRpcSystem( - RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, Path tempFile) { + RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, @Nullable Path tempDirectory) { this.rpcSystem = Preconditions.checkNotNull(rpcSystem); this.pluginLoader = Preconditions.checkNotNull(pluginLoader); - this.tempFile = Preconditions.checkNotNull(tempFile); + this.tempDirectory = tempDirectory; } @Override @@ -57,10 +57,12 @@ class CleanupOnCloseRpcSystem implements RpcSystem { } catch (Exception e) { LOG.warn("Could not close RpcSystem classloader.", e); } - try { - Files.delete(tempFile); - } catch (Exception e) { - LOG.warn("Could not delete temporary rpc system file {}.", tempFile, e); + if (tempDirectory != null) { + try { + FileUtils.deleteFileOrDirectory(tempDirectory.toFile()); + } catch (Exception e) { + LOG.warn("Could not delete temporary rpc system file {}.", tempDirectory, e); + } } } diff --git a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java new file mode 100644 index 0000000..a8624bb --- /dev/null +++ b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.classloading.SubmoduleClassLoader; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.runtime.rpc.RpcSystemLoader; +import org.apache.flink.util.OperatingSystem; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Fallback {@link RpcSystemLoader} that does not rely on the flink-rpc-akka fat jar (like {@link + * AkkaRpcSystemLoader}) but instead uses the flink-rpc-akka/target/classes and maven to load the + * rpc system. + */ +public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader { + private static final Logger LOG = LoggerFactory.getLogger(FallbackAkkaRpcSystemLoader.class); + + @Override + public RpcSystem loadRpcSystem(Configuration config) { + try { + LOG.debug( + "Using Fallback AkkaRpcSystemLoader; this loader will invoke maven to retrieve the dependencies of flink-rpc-akka."); + + final ClassLoader flinkClassLoader = RpcSystem.class.getClassLoader(); + + // flink-rpc/flink-rpc-akka + final Path akkaRpcModuleDirectory = + findAkkaRpcModuleDirectory(getCurrentWorkingDirectory()); + + // flink-rpc/flink-rpc-akka/target/classes + final Path akkaRpcModuleClassesDirectory = + akkaRpcModuleDirectory.resolve(Paths.get("target", "classes")); + + // flink-rpc/flink-rpc-akka/target/dependencies + final Path akkaRpcModuleDependenciesDirectory = + akkaRpcModuleDirectory.resolve(Paths.get("target", "dependencies")); + + if (!Files.exists(akkaRpcModuleDependenciesDirectory)) { + downloadDependencies(akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory); + } else { + LOG.debug( + "Re-using previously downloaded flink-rpc-akka dependencies. If you are experiencing strange issues, try clearing '{}'.", + akkaRpcModuleDependenciesDirectory); + } + + // assemble URL collection containing target/classes and each jar + final List<URL> urls = new ArrayList<>(); + urls.add(akkaRpcModuleClassesDirectory.toUri().toURL()); + try (final Stream<Path> files = Files.list(akkaRpcModuleDependenciesDirectory)) { + final List<Path> collect = + files.filter(path -> path.getFileName().toString().endsWith(".jar")) + .collect(Collectors.toList()); + + for (Path path : collect) { + urls.add(path.toUri().toURL()); + } + } + + final SubmoduleClassLoader submoduleClassLoader = + new SubmoduleClassLoader(urls.toArray(new URL[0]), flinkClassLoader); + + return new CleanupOnCloseRpcSystem( + ServiceLoader.load(RpcSystem.class, submoduleClassLoader).iterator().next(), + submoduleClassLoader, + null); + } catch (Exception e) { + throw new RuntimeException( + "Could not initialize RPC system. Run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line.", + e); + } + } + + private static Path getCurrentWorkingDirectory() { + return Paths.get("").toAbsolutePath(); + } + + private static Path findAkkaRpcModuleDirectory(Path currentParentCandidate) throws IOException { + try (Stream<Path> directoryContents = Files.list(currentParentCandidate)) { + final Optional<Path> flinkRpcModuleDirectory = + directoryContents + .filter(path -> path.getFileName().toString().equals("flink-rpc")) + .findFirst(); + if (flinkRpcModuleDirectory.isPresent()) { + return flinkRpcModuleDirectory + .map(path -> path.resolve(Paths.get("flink-rpc-akka"))) + .get(); + } + } + return findAkkaRpcModuleDirectory(currentParentCandidate.getParent()); + } + + private static void downloadDependencies(Path workingDirectory, Path targetDirectory) + throws IOException, InterruptedException { + + final String mvnExecutable = OperatingSystem.isWindows() ? "mvn.bat" : "mvn"; + + final ProcessBuilder mvn = + new ProcessBuilder() + .directory(workingDirectory.toFile()) + .command( + mvnExecutable, + "dependency:copy-dependencies", + "-DincludeScope=runtime", // excludes provided/test dependencies + "-DoutputDirectory=" + targetDirectory) + .redirectOutput(ProcessBuilder.Redirect.INHERIT); + mvn.start().waitFor(); + } +} diff --git a/flink-rpc/flink-rpc-akka-loader/src/test/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader b/flink-rpc/flink-rpc-akka-loader/src/test/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader new file mode 100644 index 0000000..eeba46c --- /dev/null +++ b/flink-rpc/flink-rpc-akka-loader/src/test/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.runtime.rpc.akka.FallbackAkkaRpcSystemLoader diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index b5217fb..ca9b557 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -57,6 +57,15 @@ under the License. </dependency> <dependency> + <!-- provides fallback AkkaRpcSystem loader for the IDE --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-rpc-akka-loader</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index e0a140f..dcb97c3 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -58,6 +58,14 @@ under the License. </dependency> <dependency> + <!-- provides fallback AkkaRpcSystem loader for the IDE --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-rpc-akka-loader</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${project.version}</version>
