This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 64abd566e863e584a3d86eba17aee39d9bede4cd
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Sep 24 09:38:30 2021 +0200

    [FLINK-24367][tests] Add FallbackAkkaRpcSystemLoader
    
    The added loader constructs the classloader based on the target/classes 
directory of flink-rpc-akka, after downloading required dependencies via maven.
---
 .../runtime/rpc/akka/AkkaRpcSystemLoader.java      |   3 +-
 .../runtime/rpc/akka/CleanupOnCloseRpcSystem.java  |  18 +--
 .../rpc/akka/FallbackAkkaRpcSystemLoader.java      | 139 +++++++++++++++++++++
 .../org.apache.flink.runtime.rpc.RpcSystemLoader   |  16 +++
 flink-runtime/pom.xml                              |   9 ++
 flink-test-utils-parent/flink-test-utils/pom.xml   |   8 ++
 6 files changed, 184 insertions(+), 9 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
 
b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
index da0477b..ed8977a 100644
--- 
a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
+++ 
b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
@@ -58,7 +58,8 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader {
             if (resourceStream == null) {
                 throw new RuntimeException(
                         "Akka RPC system could not be found. If this happened 
while running a test in the IDE,"
-                                + "run 'mvn package -pl 
flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the 
command-line.");
+                                + "run 'mvn package -pl 
flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line,"
+                                + "or add a test dependency on the 
flink-rpc-akka-loader test-jar.");
             }
 
             IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
diff --git 
a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java
 
b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java
index 621047b..e45f4b5 100644
--- 
a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java
+++ 
b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.classloading.SubmoduleClassLoader;
 import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -30,7 +31,6 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.nio.file.Files;
 import java.nio.file.Path;
 
 /** An {@link RpcSystem} wrapper that cleans up resources after the RPC system 
has been closed. */
@@ -39,13 +39,13 @@ class CleanupOnCloseRpcSystem implements RpcSystem {
 
     private final RpcSystem rpcSystem;
     private final SubmoduleClassLoader pluginLoader;
-    private final Path tempFile;
+    @Nullable private final Path tempDirectory;
 
     public CleanupOnCloseRpcSystem(
-            RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, Path 
tempFile) {
+            RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, @Nullable 
Path tempDirectory) {
         this.rpcSystem = Preconditions.checkNotNull(rpcSystem);
         this.pluginLoader = Preconditions.checkNotNull(pluginLoader);
-        this.tempFile = Preconditions.checkNotNull(tempFile);
+        this.tempDirectory = tempDirectory;
     }
 
     @Override
@@ -57,10 +57,12 @@ class CleanupOnCloseRpcSystem implements RpcSystem {
         } catch (Exception e) {
             LOG.warn("Could not close RpcSystem classloader.", e);
         }
-        try {
-            Files.delete(tempFile);
-        } catch (Exception e) {
-            LOG.warn("Could not delete temporary rpc system file {}.", 
tempFile, e);
+        if (tempDirectory != null) {
+            try {
+                FileUtils.deleteFileOrDirectory(tempDirectory.toFile());
+            } catch (Exception e) {
+                LOG.warn("Could not delete temporary rpc system file {}.", 
tempDirectory, e);
+            }
         }
     }
 
diff --git 
a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
 
b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
new file mode 100644
index 0000000..a8624bb
--- /dev/null
+++ 
b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.classloading.SubmoduleClassLoader;
+import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.runtime.rpc.RpcSystemLoader;
+import org.apache.flink.util.OperatingSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Fallback {@link RpcSystemLoader} that does not rely on the flink-rpc-akka 
fat jar (like {@link
+ * AkkaRpcSystemLoader}) but instead uses the flink-rpc-akka/target/classes 
and maven to load the
+ * rpc system.
+ */
+public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FallbackAkkaRpcSystemLoader.class);
+
+    @Override
+    public RpcSystem loadRpcSystem(Configuration config) {
+        try {
+            LOG.debug(
+                    "Using Fallback AkkaRpcSystemLoader; this loader will 
invoke maven to retrieve the dependencies of flink-rpc-akka.");
+
+            final ClassLoader flinkClassLoader = 
RpcSystem.class.getClassLoader();
+
+            // flink-rpc/flink-rpc-akka
+            final Path akkaRpcModuleDirectory =
+                    findAkkaRpcModuleDirectory(getCurrentWorkingDirectory());
+
+            // flink-rpc/flink-rpc-akka/target/classes
+            final Path akkaRpcModuleClassesDirectory =
+                    akkaRpcModuleDirectory.resolve(Paths.get("target", 
"classes"));
+
+            // flink-rpc/flink-rpc-akka/target/dependencies
+            final Path akkaRpcModuleDependenciesDirectory =
+                    akkaRpcModuleDirectory.resolve(Paths.get("target", 
"dependencies"));
+
+            if (!Files.exists(akkaRpcModuleDependenciesDirectory)) {
+                downloadDependencies(akkaRpcModuleDirectory, 
akkaRpcModuleDependenciesDirectory);
+            } else {
+                LOG.debug(
+                        "Re-using previously downloaded flink-rpc-akka 
dependencies. If you are experiencing strange issues, try clearing '{}'.",
+                        akkaRpcModuleDependenciesDirectory);
+            }
+
+            // assemble URL collection containing target/classes and each jar
+            final List<URL> urls = new ArrayList<>();
+            urls.add(akkaRpcModuleClassesDirectory.toUri().toURL());
+            try (final Stream<Path> files = 
Files.list(akkaRpcModuleDependenciesDirectory)) {
+                final List<Path> collect =
+                        files.filter(path -> 
path.getFileName().toString().endsWith(".jar"))
+                                .collect(Collectors.toList());
+
+                for (Path path : collect) {
+                    urls.add(path.toUri().toURL());
+                }
+            }
+
+            final SubmoduleClassLoader submoduleClassLoader =
+                    new SubmoduleClassLoader(urls.toArray(new URL[0]), 
flinkClassLoader);
+
+            return new CleanupOnCloseRpcSystem(
+                    ServiceLoader.load(RpcSystem.class, 
submoduleClassLoader).iterator().next(),
+                    submoduleClassLoader,
+                    null);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not initialize RPC system. Run 'mvn package -pl 
flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line.",
+                    e);
+        }
+    }
+
+    private static Path getCurrentWorkingDirectory() {
+        return Paths.get("").toAbsolutePath();
+    }
+
+    private static Path findAkkaRpcModuleDirectory(Path 
currentParentCandidate) throws IOException {
+        try (Stream<Path> directoryContents = 
Files.list(currentParentCandidate)) {
+            final Optional<Path> flinkRpcModuleDirectory =
+                    directoryContents
+                            .filter(path -> 
path.getFileName().toString().equals("flink-rpc"))
+                            .findFirst();
+            if (flinkRpcModuleDirectory.isPresent()) {
+                return flinkRpcModuleDirectory
+                        .map(path -> path.resolve(Paths.get("flink-rpc-akka")))
+                        .get();
+            }
+        }
+        return findAkkaRpcModuleDirectory(currentParentCandidate.getParent());
+    }
+
+    private static void downloadDependencies(Path workingDirectory, Path 
targetDirectory)
+            throws IOException, InterruptedException {
+
+        final String mvnExecutable = OperatingSystem.isWindows() ? "mvn.bat" : 
"mvn";
+
+        final ProcessBuilder mvn =
+                new ProcessBuilder()
+                        .directory(workingDirectory.toFile())
+                        .command(
+                                mvnExecutable,
+                                "dependency:copy-dependencies",
+                                "-DincludeScope=runtime", // excludes 
provided/test dependencies
+                                "-DoutputDirectory=" + targetDirectory)
+                        .redirectOutput(ProcessBuilder.Redirect.INHERIT);
+        mvn.start().waitFor();
+    }
+}
diff --git 
a/flink-rpc/flink-rpc-akka-loader/src/test/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader
 
b/flink-rpc/flink-rpc-akka-loader/src/test/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader
new file mode 100644
index 0000000..eeba46c
--- /dev/null
+++ 
b/flink-rpc/flink-rpc-akka-loader/src/test/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.rpc.akka.FallbackAkkaRpcSystemLoader
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b5217fb..ca9b557 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -57,6 +57,15 @@ under the License.
                </dependency>
 
                <dependency>
+                       <!-- provides fallback AkkaRpcSystem loader for the IDE 
-->
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-rpc-akka-loader</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml 
b/flink-test-utils-parent/flink-test-utils/pom.xml
index e0a140f..dcb97c3 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -58,6 +58,14 @@ under the License.
                </dependency>
 
                <dependency>
+                       <!-- provides fallback AkkaRpcSystem loader for the IDE 
-->
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-rpc-akka-loader</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-clients_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>

Reply via email to