This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 43066fe4afb6ddaf464c7051955cc2b857b13f93
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Aug 12 17:43:21 2021 +0800

    [FLINK-19554][testutil/container] Support TM restarting in FlinkContainer 
and network accessibility from other containers
---
 .../flink/tests/util/flink/FlinkContainer.java     | 99 ++++++++++++++++++++--
 1 file changed, 91 insertions(+), 8 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
index bcb71b2..3034bf2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.tests.util.flink;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.tests.util.parameters.ParameterProperty;
 import org.apache.flink.tests.util.util.FileUtils;
@@ -35,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
 import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 import org.testcontainers.images.builder.ImageFromDockerfile;
 import org.testcontainers.lifecycle.TestDescription;
@@ -64,21 +67,27 @@ import java.util.stream.IntStream;
  * container.
  */
 public class FlinkContainer extends GenericContainer<FlinkContainer> 
implements TestLifecycleAware {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkContainer.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
     private static final String FLINK_BIN = "flink/bin";
+    public static final int JOB_MANAGER_REST_PORT = 8081;
 
     private final TemporaryFolder temporaryFolder = new TemporaryFolder();
     private final Path logBackupDir;
+    private final int numTaskManagers;
 
     private FlinkContainer(
             ImageFromDockerfile image, int numTaskManagers, @Nullable Path 
logBackupDir) {
         super(image);
         this.logBackupDir = logBackupDir;
-        withExposedPorts(8081);
+        this.numTaskManagers = numTaskManagers;
+        withExposedPorts(JOB_MANAGER_REST_PORT);
+        // Create a network for connecting with other containers
+        withNetwork(Network.newNetwork());
         waitingFor(
                 new HttpWaitStrategy()
-                        .forPort(8081)
+                        .forPort(JOB_MANAGER_REST_PORT)
                         .forPath("/taskmanagers")
                         .forResponsePredicate(
                                 response -> {
@@ -107,8 +116,7 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
     public void afterTest(TestDescription description, Optional<Throwable> 
throwable) {
         if (throwable.isPresent() && logBackupDir != null) {
             try {
-                final Path targetDirectory =
-                        logBackupDir.resolve("flink-" + 
UUID.randomUUID().toString());
+                final Path targetDirectory = logBackupDir.resolve("flink-" + 
UUID.randomUUID());
                 copyFileOrDirectoryFromContainer("flink/log/", 
targetDirectory);
                 LOG.info("Backed up logs to {}.", targetDirectory);
             } catch (IOException e) {
@@ -136,7 +144,7 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
     }
 
     private static void unTar(TarArchiveInputStream tis, File destFolder) 
throws IOException {
-        TarArchiveEntry entry = null;
+        TarArchiveEntry entry;
         while ((entry = tis.getNextTarEntry()) != null) {
             FileOutputStream fos = null;
             try {
@@ -192,6 +200,38 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
         }
     }
 
+    /**
+     * Restart all task managers.
+     *
+     * @param afterFailAction Action to do between stopping and restarting 
task managers
+     * @throws Exception If anything wrong happens during the restart
+     */
+    public void restartTaskManager(Runnable afterFailAction) throws Exception {
+        final String[] stopCommand = {FLINK_BIN + "/taskmanager.sh", 
"stop-all"};
+        final ExecResult stopResult = execInContainer(stopCommand);
+        checkExitCode(stopResult, stopCommand);
+
+        afterFailAction.run();
+
+        for (int i = 0; i < numTaskManagers; i++) {
+            final String[] startCommand = {FLINK_BIN + "/taskmanager.sh", 
"start"};
+            final ExecResult startResult = execInContainer(startCommand);
+            checkExitCode(startResult, startCommand);
+        }
+    }
+
+    public void checkExitCode(ExecResult execResult, String... command) {
+        if (execResult.getExitCode() != 0) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Command \"%s\" exited with code %d. \nSTDOUT: 
%s\nSTDERR: %s",
+                            String.join(" ", command),
+                            execResult.getExitCode(),
+                            execResult.getStdout(),
+                            execResult.getStderr()));
+        }
+    }
+
     @Nonnull
     private String copyAndGetContainerPath(String defaultEnvFile) {
         Path path = Paths.get(defaultEnvFile);
@@ -211,6 +251,7 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
                 new ParameterProperty<>("logBackupDir", Paths::get);
 
         private int numTaskManagers = 1;
+        private Configuration flinkConfiguration;
         private String javaVersion;
         private final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -224,6 +265,17 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
         }
 
         /**
+         * Additional Flink configurations for the cluster.
+         *
+         * @param flinkConfiguration Additional Flink configurations
+         * @return Builder itself
+         */
+        public FlinkContainerBuilder withFlinkConfiguration(Configuration 
flinkConfiguration) {
+            this.flinkConfiguration = flinkConfiguration;
+            return this;
+        }
+
+        /**
          * Specifies which OpenJDK version to use. If not provided explicitly, 
the image version
          * will be derived based on the version of the java that runs the test.
          */
@@ -235,8 +287,23 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
         public FlinkContainer build() {
             try {
                 Path flinkDist = FileUtils.findFlinkDist();
+                Path flinkConfDirectory = flinkDist.resolve("conf");
+
+                // Load Flink configurations
+                final Configuration mergedFlinkConfiguration =
+                        GlobalConfiguration.loadConfiguration(
+                                
flinkConfDirectory.toAbsolutePath().toString());
+
+                // Merge additional Flink configurations
+                if (flinkConfiguration != null) {
+                    mergedFlinkConfiguration.addAll(flinkConfiguration);
+                }
+
+                // Create temporary folder for holding configuration files
                 temporaryFolder.create();
                 Path tmp = temporaryFolder.newFolder().toPath();
+
+                // Write worker file
                 Path workersFile = tmp.resolve("workers");
                 Files.write(
                         workersFile,
@@ -244,6 +311,14 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
                                 .mapToObj(i -> "localhost")
                                 .collect(Collectors.toList()));
 
+                // Write merged Flink configuration
+                Path configurationFile = 
tmp.resolve(GlobalConfiguration.FLINK_CONF_FILENAME);
+                final List<String> configurationLines =
+                        mergedFlinkConfiguration.toMap().entrySet().stream()
+                                .map(entry -> entry.getKey() + ": " + 
entry.getValue())
+                                .collect(Collectors.toList());
+                Files.write(configurationFile, configurationLines);
+
                 // Building the docker image is split into two stages:
                 // 1. build a base image with an immutable flink-dist
                 // 2. based on the base image add any mutable files such as 
e.g. workers files
@@ -251,7 +326,8 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
                 // This lets us save some time for archiving and copying big, 
immutable files
                 // between tests runs.
                 String baseImage = buildBaseImage(flinkDist);
-                ImageFromDockerfile configuredImage = 
buildConfiguredImage(workersFile, baseImage);
+                ImageFromDockerfile configuredImage =
+                        buildConfiguredImage(workersFile, configurationFile, 
baseImage);
 
                 Optional<Path> logBackupDirectory = 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
                 if (!logBackupDirectory.isPresent()) {
@@ -267,17 +343,24 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
             }
         }
 
-        private ImageFromDockerfile buildConfiguredImage(Path workersFile, 
String baseImage) {
+        private ImageFromDockerfile buildConfiguredImage(
+                Path workersFile, Path configurationFile, String baseImage) {
             return new ImageFromDockerfile("flink-dist-configured")
                     .withDockerfileFromBuilder(
                             builder ->
                                     builder.from(baseImage)
                                             .copy("workers", 
"flink/conf/workers")
+                                            .copy(
+                                                    
GlobalConfiguration.FLINK_CONF_FILENAME,
+                                                    "flink/conf/"
+                                                            + 
GlobalConfiguration
+                                                                    
.FLINK_CONF_FILENAME)
                                             .cmd(
                                                     FLINK_BIN
                                                             + 
"/start-cluster.sh && tail -f /dev/null")
                                             .build())
-                    .withFileFromPath("workers", workersFile);
+                    .withFileFromPath("workers", workersFile)
+                    .withFileFromPath(GlobalConfiguration.FLINK_CONF_FILENAME, 
configurationFile);
         }
 
         @Nonnull

Reply via email to