This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 43066fe4afb6ddaf464c7051955cc2b857b13f93 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Aug 12 17:43:21 2021 +0800 [FLINK-19554][testutil/container] Support TM restarting in FlinkContainer and network accessibility from other containers --- .../flink/tests/util/flink/FlinkContainer.java | 99 ++++++++++++++++++++-- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java index bcb71b2..3034bf2 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java @@ -18,6 +18,8 @@ package org.apache.flink.tests.util.flink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; import org.apache.flink.tests.util.parameters.ParameterProperty; import org.apache.flink.tests.util.util.FileUtils; @@ -35,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.lifecycle.TestDescription; @@ -64,21 +67,27 @@ import java.util.stream.IntStream; * container. */ public class FlinkContainer extends GenericContainer<FlinkContainer> implements TestLifecycleAware { + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class); private static final ObjectMapper objectMapper = new ObjectMapper(); private static final String FLINK_BIN = "flink/bin"; + public static final int JOB_MANAGER_REST_PORT = 8081; private final TemporaryFolder temporaryFolder = new TemporaryFolder(); private final Path logBackupDir; + private final int numTaskManagers; private FlinkContainer( ImageFromDockerfile image, int numTaskManagers, @Nullable Path logBackupDir) { super(image); this.logBackupDir = logBackupDir; - withExposedPorts(8081); + this.numTaskManagers = numTaskManagers; + withExposedPorts(JOB_MANAGER_REST_PORT); + // Create a network for connecting with other containers + withNetwork(Network.newNetwork()); waitingFor( new HttpWaitStrategy() - .forPort(8081) + .forPort(JOB_MANAGER_REST_PORT) .forPath("/taskmanagers") .forResponsePredicate( response -> { @@ -107,8 +116,7 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements public void afterTest(TestDescription description, Optional<Throwable> throwable) { if (throwable.isPresent() && logBackupDir != null) { try { - final Path targetDirectory = - logBackupDir.resolve("flink-" + UUID.randomUUID().toString()); + final Path targetDirectory = logBackupDir.resolve("flink-" + UUID.randomUUID()); copyFileOrDirectoryFromContainer("flink/log/", targetDirectory); LOG.info("Backed up logs to {}.", targetDirectory); } catch (IOException e) { @@ -136,7 +144,7 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements } private static void unTar(TarArchiveInputStream tis, File destFolder) throws IOException { - TarArchiveEntry entry = null; + TarArchiveEntry entry; while ((entry = tis.getNextTarEntry()) != null) { FileOutputStream fos = null; try { @@ -192,6 +200,38 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements } } + /** + * Restart all task managers. + * + * @param afterFailAction Action to do between stopping and restarting task managers + * @throws Exception If anything wrong happens during the restart + */ + public void restartTaskManager(Runnable afterFailAction) throws Exception { + final String[] stopCommand = {FLINK_BIN + "/taskmanager.sh", "stop-all"}; + final ExecResult stopResult = execInContainer(stopCommand); + checkExitCode(stopResult, stopCommand); + + afterFailAction.run(); + + for (int i = 0; i < numTaskManagers; i++) { + final String[] startCommand = {FLINK_BIN + "/taskmanager.sh", "start"}; + final ExecResult startResult = execInContainer(startCommand); + checkExitCode(startResult, startCommand); + } + } + + public void checkExitCode(ExecResult execResult, String... command) { + if (execResult.getExitCode() != 0) { + throw new IllegalStateException( + String.format( + "Command \"%s\" exited with code %d. \nSTDOUT: %s\nSTDERR: %s", + String.join(" ", command), + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr())); + } + } + @Nonnull private String copyAndGetContainerPath(String defaultEnvFile) { Path path = Paths.get(defaultEnvFile); @@ -211,6 +251,7 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements new ParameterProperty<>("logBackupDir", Paths::get); private int numTaskManagers = 1; + private Configuration flinkConfiguration; private String javaVersion; private final TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -224,6 +265,17 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements } /** + * Additional Flink configurations for the cluster. + * + * @param flinkConfiguration Additional Flink configurations + * @return Builder itself + */ + public FlinkContainerBuilder withFlinkConfiguration(Configuration flinkConfiguration) { + this.flinkConfiguration = flinkConfiguration; + return this; + } + + /** * Specifies which OpenJDK version to use. If not provided explicitly, the image version * will be derived based on the version of the java that runs the test. */ @@ -235,8 +287,23 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements public FlinkContainer build() { try { Path flinkDist = FileUtils.findFlinkDist(); + Path flinkConfDirectory = flinkDist.resolve("conf"); + + // Load Flink configurations + final Configuration mergedFlinkConfiguration = + GlobalConfiguration.loadConfiguration( + flinkConfDirectory.toAbsolutePath().toString()); + + // Merge additional Flink configurations + if (flinkConfiguration != null) { + mergedFlinkConfiguration.addAll(flinkConfiguration); + } + + // Create temporary folder for holding configuration files temporaryFolder.create(); Path tmp = temporaryFolder.newFolder().toPath(); + + // Write worker file Path workersFile = tmp.resolve("workers"); Files.write( workersFile, @@ -244,6 +311,14 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements .mapToObj(i -> "localhost") .collect(Collectors.toList())); + // Write merged Flink configuration + Path configurationFile = tmp.resolve(GlobalConfiguration.FLINK_CONF_FILENAME); + final List<String> configurationLines = + mergedFlinkConfiguration.toMap().entrySet().stream() + .map(entry -> entry.getKey() + ": " + entry.getValue()) + .collect(Collectors.toList()); + Files.write(configurationFile, configurationLines); + // Building the docker image is split into two stages: // 1. build a base image with an immutable flink-dist // 2. based on the base image add any mutable files such as e.g. workers files @@ -251,7 +326,8 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements // This lets us save some time for archiving and copying big, immutable files // between tests runs. String baseImage = buildBaseImage(flinkDist); - ImageFromDockerfile configuredImage = buildConfiguredImage(workersFile, baseImage); + ImageFromDockerfile configuredImage = + buildConfiguredImage(workersFile, configurationFile, baseImage); Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); if (!logBackupDirectory.isPresent()) { @@ -267,17 +343,24 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements } } - private ImageFromDockerfile buildConfiguredImage(Path workersFile, String baseImage) { + private ImageFromDockerfile buildConfiguredImage( + Path workersFile, Path configurationFile, String baseImage) { return new ImageFromDockerfile("flink-dist-configured") .withDockerfileFromBuilder( builder -> builder.from(baseImage) .copy("workers", "flink/conf/workers") + .copy( + GlobalConfiguration.FLINK_CONF_FILENAME, + "flink/conf/" + + GlobalConfiguration + .FLINK_CONF_FILENAME) .cmd( FLINK_BIN + "/start-cluster.sh && tail -f /dev/null") .build()) - .withFileFromPath("workers", workersFile); + .withFileFromPath("workers", workersFile) + .withFileFromPath(GlobalConfiguration.FLINK_CONF_FILENAME, configurationFile); } @Nonnull
