fapaul commented on a change in pull request #17892: URL: https://github.com/apache/flink/pull/17892#discussion_r763105121
########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A Flink cluster running JM and TM on different containers. */ +public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class); + + // Hostname of components within container network + public static final String JOB_MANAGER_HOSTNAME = "jobmanager"; + public static final String TASK_MANAGER_HOSTNAME_PREFIX = "taskmanager-"; + public static final String ZOOKEEPER_HOSTNAME = "zookeeper"; + + // Directories for storing states + public static final Path CHECKPOINT_PATH = Paths.get("/flink/checkpoint"); + public static final Path HA_STORAGE_PATH = Paths.get("/flink/recovery"); + + // Default timeout of operations + public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); Review comment: I think we should move these values to the `FlinkContainersBuilder` ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.tests.util.util.FileUtils; + +import com.github.dockerjava.api.exception.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A builder class for constructing Docker image based on flink-dist. */ +public class FlinkImageBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkImageBuilder.class); + private static final String FLINK_BASE_IMAGE_NAME = "flink-dist-base"; + private static final String DEFAULT_IMAGE_NAME = "flink-dist-configured"; + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5); + private static final String LOG4J_PROPERTIES_FILENAME = "log4j-console.properties"; + + private final Map<Path, Path> filesToCopy = new HashMap<>(); + private final Properties logProperties = new Properties(); + + private String imageName = DEFAULT_IMAGE_NAME; + private String imageNameSuffix; + private Path flinkDist = FileUtils.findFlinkDist(); + private String javaVersion; + private Configuration conf; + private Duration timeout = DEFAULT_TIMEOUT; + private String startupCommand; + + /** + * Sets the name of building image. + * + * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be used. + */ + public FlinkImageBuilder setImageName(String imageName) { + this.imageName = imageName; + return this; + } + + /** + * Sets the path of flink-dist directory. + * + * <p>If path is not specified, the dist directory under current project will be used. + */ + public FlinkImageBuilder setFlinkDistPath(Path flinkDist) { + this.flinkDist = flinkDist; + return this; + } + + /** + * Sets JDK version in the image. + * + * <p>This version string will be used as the tag of openjdk image. If version is not specified, + * the JDK version of the current JVM will be used. + * + * @see <a href="https://hub.docker.com/_/openjdk">OpenJDK on Docker Hub</a> for all available + * tags. + */ + public FlinkImageBuilder setJavaVersion(String javaVersion) { + this.javaVersion = javaVersion; + return this; + } + + /** + * Sets Flink configuration. This configuration will be used for generating flink-conf.yaml for + * configuring JobManager and TaskManager. + */ + public FlinkImageBuilder setConfiguration(Configuration conf) { + this.conf = conf; + return this; + } + + /** + * Sets log4j properties. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkImageBuilder setLogProperties(Properties logProperties) { + this.logProperties.putAll(logProperties); + return this; + } + + /** Copies file into the image. */ + public FlinkImageBuilder copyFile(Path localPath, Path containerPath) { + filesToCopy.put(localPath, containerPath); + return this; + } + + /** Sets timeout for building the image. */ + public FlinkImageBuilder setTimeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + /** Use this image for building a JobManager. */ + public FlinkImageBuilder asJobManager() { + checkStartupCommandNotSet(); + this.startupCommand = "flink/bin/jobmanager.sh start-foreground && tail -f /dev/null"; + this.imageNameSuffix = "jobmanager"; + return this; + } + + /** Use this image for building a TaskManager. */ + public FlinkImageBuilder asTaskManager() { + checkStartupCommandNotSet(); + this.startupCommand = "flink/bin/taskmanager.sh start-foreground && tail -f /dev/null"; + this.imageNameSuffix = "taskmanager"; + return this; + } + + /** Use a custom command for starting up the container. */ + public FlinkImageBuilder useCustomStartupCommand(String command) { + checkStartupCommandNotSet(); + this.startupCommand = command; + this.imageNameSuffix = "custom"; + return this; + } + + /** Build the image. */ + public ImageFromDockerfile build() throws ImageBuildException { + sanityCheck(); + final String finalImageName = imageName + "-" + imageNameSuffix; + try { + // Build base image first + buildBaseImage(flinkDist); + final Path tempDirectory = + Files.createTempDirectory("flink-dist-build-" + UUID.randomUUID()); + final Path flinkConfFile = createTemporaryFlinkConfFile(tempDirectory); + final Path log4jPropertiesFile = createTemporaryLog4jPropertiesFile(tempDirectory); + // Copy flink-conf.yaml into image + filesToCopy.put( + flinkConfFile, + Paths.get("flink", "conf", GlobalConfiguration.FLINK_CONF_FILENAME)); + filesToCopy.put( + log4jPropertiesFile, Paths.get("flink", "conf", LOG4J_PROPERTIES_FILENAME)); + + final ImageFromDockerfile image = + new ImageFromDockerfile(finalImageName) + .withDockerfileFromBuilder( + builder -> { + // Build from base image + builder.from(FLINK_BASE_IMAGE_NAME); + // Copy files into image + filesToCopy.forEach( + (from, to) -> + builder.copy(to.toString(), to.toString())); + builder.cmd(startupCommand); + }); + filesToCopy.forEach((from, to) -> image.withFileFromPath(to.toString(), from)); + return image; + } catch (Exception e) { + throw new ImageBuildException( + String.format("Failed to build Flink image \"%s\"", finalImageName), e); + } + } + + // ----------------------- Helper functions ----------------------- + + private void buildBaseImage(Path flinkDist) throws TimeoutException { + if (!baseImageExists()) { Review comment: Nit: I am a fan of handling of all conditional cases explicitly. I think it makes reading the code easier and removes one indentation. ```java if (baseImageExists()) { return; } ... ``` ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.tests.util.util.FileUtils; + +import com.github.dockerjava.api.exception.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A builder class for constructing Docker image based on flink-dist. */ +public class FlinkImageBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkImageBuilder.class); + private static final String FLINK_BASE_IMAGE_NAME = "flink-dist-base"; + private static final String DEFAULT_IMAGE_NAME = "flink-dist-configured"; + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5); + private static final String LOG4J_PROPERTIES_FILENAME = "log4j-console.properties"; + + private final Map<Path, Path> filesToCopy = new HashMap<>(); + private final Properties logProperties = new Properties(); + + private String imageName = DEFAULT_IMAGE_NAME; + private String imageNameSuffix; + private Path flinkDist = FileUtils.findFlinkDist(); + private String javaVersion; + private Configuration conf; + private Duration timeout = DEFAULT_TIMEOUT; + private String startupCommand; + + /** + * Sets the name of building image. + * + * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be used. + */ + public FlinkImageBuilder setImageName(String imageName) { + this.imageName = imageName; + return this; + } + + /** + * Sets the path of flink-dist directory. + * + * <p>If path is not specified, the dist directory under current project will be used. + */ + public FlinkImageBuilder setFlinkDistPath(Path flinkDist) { + this.flinkDist = flinkDist; + return this; + } + + /** + * Sets JDK version in the image. + * + * <p>This version string will be used as the tag of openjdk image. If version is not specified, + * the JDK version of the current JVM will be used. + * + * @see <a href="https://hub.docker.com/_/openjdk">OpenJDK on Docker Hub</a> for all available + * tags. + */ + public FlinkImageBuilder setJavaVersion(String javaVersion) { + this.javaVersion = javaVersion; + return this; + } + + /** + * Sets Flink configuration. This configuration will be used for generating flink-conf.yaml for + * configuring JobManager and TaskManager. + */ + public FlinkImageBuilder setConfiguration(Configuration conf) { + this.conf = conf; + return this; + } + + /** + * Sets log4j properties. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkImageBuilder setLogProperties(Properties logProperties) { + this.logProperties.putAll(logProperties); + return this; + } + + /** Copies file into the image. */ + public FlinkImageBuilder copyFile(Path localPath, Path containerPath) { + filesToCopy.put(localPath, containerPath); + return this; + } + + /** Sets timeout for building the image. */ + public FlinkImageBuilder setTimeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + /** Use this image for building a JobManager. */ + public FlinkImageBuilder asJobManager() { + checkStartupCommandNotSet(); + this.startupCommand = "flink/bin/jobmanager.sh start-foreground && tail -f /dev/null"; + this.imageNameSuffix = "jobmanager"; + return this; + } + + /** Use this image for building a TaskManager. */ + public FlinkImageBuilder asTaskManager() { + checkStartupCommandNotSet(); + this.startupCommand = "flink/bin/taskmanager.sh start-foreground && tail -f /dev/null"; + this.imageNameSuffix = "taskmanager"; + return this; + } + + /** Use a custom command for starting up the container. */ + public FlinkImageBuilder useCustomStartupCommand(String command) { + checkStartupCommandNotSet(); + this.startupCommand = command; + this.imageNameSuffix = "custom"; + return this; + } + + /** Build the image. */ + public ImageFromDockerfile build() throws ImageBuildException { + sanityCheck(); + final String finalImageName = imageName + "-" + imageNameSuffix; + try { + // Build base image first + buildBaseImage(flinkDist); + final Path tempDirectory = + Files.createTempDirectory("flink-dist-build-" + UUID.randomUUID()); + final Path flinkConfFile = createTemporaryFlinkConfFile(tempDirectory); + final Path log4jPropertiesFile = createTemporaryLog4jPropertiesFile(tempDirectory); + // Copy flink-conf.yaml into image + filesToCopy.put( + flinkConfFile, + Paths.get("flink", "conf", GlobalConfiguration.FLINK_CONF_FILENAME)); + filesToCopy.put( + log4jPropertiesFile, Paths.get("flink", "conf", LOG4J_PROPERTIES_FILENAME)); + + final ImageFromDockerfile image = + new ImageFromDockerfile(finalImageName) + .withDockerfileFromBuilder( + builder -> { + // Build from base image + builder.from(FLINK_BASE_IMAGE_NAME); + // Copy files into image + filesToCopy.forEach( + (from, to) -> + builder.copy(to.toString(), to.toString())); + builder.cmd(startupCommand); + }); + filesToCopy.forEach((from, to) -> image.withFileFromPath(to.toString(), from)); + return image; + } catch (Exception e) { + throw new ImageBuildException( + String.format("Failed to build Flink image \"%s\"", finalImageName), e); + } + } + + // ----------------------- Helper functions ----------------------- + + private void buildBaseImage(Path flinkDist) throws TimeoutException { + if (!baseImageExists()) { + LOG.info("Building Flink base image with flink-dist at {}", flinkDist); + new ImageFromDockerfile(FLINK_BASE_IMAGE_NAME) + .withDockerfileFromBuilder( + builder -> + builder.from("openjdk:" + getJavaVersionSuffix()) + .copy("flink", "flink") + .build()) + .withFileFromPath("flink", flinkDist) + .get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + } + + private boolean baseImageExists() { + try { + DockerClientFactory.instance().client().inspectImageCmd(FLINK_BASE_IMAGE_NAME).exec(); + return true; + } catch (NotFoundException e) { + return false; + } + } + + private String getJavaVersionSuffix() { + if (this.javaVersion != null) { + return this.javaVersion; + } else { + String javaSpecVersion = System.getProperty("java.vm.specification.version"); + LOG.info("Using JDK version {} of the current VM", javaSpecVersion); + switch (javaSpecVersion) { + case "1.8": + return "8"; + case "11": + return "11"; + default: + throw new IllegalStateException("Unexpected Java version: " + javaSpecVersion); + } + } + } + + private Path createTemporaryFlinkConfFile(Path tempDirectory) throws IOException { + // Load Flink configurations in flink-dist + final Configuration finalConfiguration = + GlobalConfiguration.loadConfiguration( + flinkDist.resolve("conf").toAbsolutePath().toString()); + + if (this.conf != null) { + finalConfiguration.addAll(this.conf); + } + + Path flinkConfFile = tempDirectory.resolve(GlobalConfiguration.FLINK_CONF_FILENAME); + + Files.write( + flinkConfFile, + finalConfiguration.toMap().entrySet().stream() + .map(entry -> entry.getKey() + ": " + entry.getValue()) + .collect(Collectors.toList())); + Review comment: This method looks a bit complicated can you add a docstring what you are trying to do here? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A Flink cluster running JM and TM on different containers. */ Review comment: It would be awesome to see a larger docstring with maybe also some code samples similar to [1]. We want to motivate users that this should be the entry point when writing e2e tests. [1] https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java#L48 ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A Flink cluster running JM and TM on different containers. */ +public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class); + + // Hostname of components within container network + public static final String JOB_MANAGER_HOSTNAME = "jobmanager"; + public static final String TASK_MANAGER_HOSTNAME_PREFIX = "taskmanager-"; + public static final String ZOOKEEPER_HOSTNAME = "zookeeper"; + + // Directories for storing states + public static final Path CHECKPOINT_PATH = Paths.get("/flink/checkpoint"); + public static final Path HA_STORAGE_PATH = Paths.get("/flink/recovery"); + + // Default timeout of operations + public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + + private final GenericContainer<?> jobManager; + private final List<GenericContainer<?>> taskManagers; + private final GenericContainer<?> haService; + private final Configuration conf; + + private RestClusterClient<StandaloneClusterId> restClusterClient; + private boolean isStarted; + + /** Creates a builder for {@link FlinkContainers}. */ + public static FlinkContainersBuilder builder() { + return new FlinkContainersBuilder(); + } + + FlinkContainers( + GenericContainer<?> jobManager, + List<GenericContainer<?>> taskManagers, + @Nullable GenericContainer<?> haService, + Configuration conf) { + this.jobManager = jobManager; + this.taskManagers = taskManagers; + this.haService = haService; + this.conf = conf; + } + + /** Starts all containers. */ + public void start() throws Exception { + if (haService != null) { + LOG.debug("Starting HA service container"); + this.haService.start(); + } + LOG.debug("Starting JobManager container"); + this.jobManager.start(); + LOG.debug("Starting TaskManager containers"); + this.taskManagers.parallelStream().forEach(GenericContainer::start); + LOG.debug("Creating REST cluster client"); + this.restClusterClient = createClusterClient(); Review comment: I think this call can become flaky because it is not guaranteed that the jobmanager is reachable when we try to build the cluster client. We need a similar readiness check to what we use for the taskmanagers. ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; + +import org.slf4j.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A builder class for {@link FlinkContainers}. */ +public class FlinkContainersBuilder { + + private final List<GenericContainer<?>> dependentContainers = new ArrayList<>(); + private final Configuration conf = new Configuration(); + private final Map<String, String> envVars = new HashMap<>(); + private final Properties logProperties = new Properties(); + + private int numTaskManagers = 1; + private Network network = Network.newNetwork(); + private Logger logger; + private boolean enableZookeeperHA = false; + + /** Sets number of TaskManagers. */ + public FlinkContainersBuilder setNumTaskManagers(int numTaskManagers) { + this.numTaskManagers = numTaskManagers; + return this; + } + + /** Sets configuration of the cluster. */ + public FlinkContainersBuilder setConfiguration(Configuration conf) { + this.conf.addAll(conf); + return this; + } + + /** + * Lets Flink cluster depending on another container, and bind the network of Flink cluster to + * the dependent one. + */ + public FlinkContainersBuilder dependsOn(GenericContainer<?> container) { + container.withNetwork(this.network); + this.dependentContainers.add(container); + return this; + } + + /** Sets environment variable to containers. */ + public FlinkContainersBuilder setEnvironmentVariable(String key, String value) { + this.envVars.put(key, value); + return this; + } + + /** Sets network of the Flink cluster. */ + public FlinkContainersBuilder setNetwork(Network network) { + this.network = network; + return this; + } + + /** Sets a logger to the cluster in order to consume STDOUT of containers to the logger. */ + public FlinkContainersBuilder setLogger(Logger logger) { + this.logger = logger; + return this; + } + + /** + * Sets log4j property. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkContainersBuilder setLogProperty(String key, String value) { + this.logProperties.setProperty(key, value); + return this; + } + + /** + * Sets log4j properties. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkContainersBuilder setLogProperties(Properties logProperties) { + this.logProperties.putAll(logProperties); + return this; + } + + /** Enables high availability service. */ + public FlinkContainersBuilder enableZookeeperHA() { + this.enableZookeeperHA = true; + return this; + } + + /** Builds {@link FlinkContainers}. */ + public FlinkContainers build() { + GenericContainer<?> zookeeper = null; + if (enableZookeeperHA) { + enableZookeeperHAConfigurations(); + zookeeper = buildZookeeperContainer(); + } + + this.conf.set(JobManagerOptions.ADDRESS, FlinkContainers.JOB_MANAGER_HOSTNAME); + this.conf.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + FlinkContainers.CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); + + final GenericContainer<?> jobManager = buildJobManagerContainer(); + final List<GenericContainer<?>> taskManagers = buildTaskManagerContainers(); + if (enableZookeeperHA) { + createTempDirAndMountToContainer( + "flink-recovery", FlinkContainers.HA_STORAGE_PATH, jobManager); + } + createTempDirAndMountToContainer( + "flink-checkpoint", FlinkContainers.CHECKPOINT_PATH, jobManager); + return new FlinkContainers(jobManager, taskManagers, zookeeper, conf); + } + + // --------------------------- Helper Functions ------------------------------------- + + private GenericContainer<?> buildJobManagerContainer() { + // Configure JobManager + final Configuration jobManagerConf = new Configuration(); + jobManagerConf.addAll(this.conf); + // Build JobManager container + final ImageFromDockerfile jobManagerImage; + try { + jobManagerImage = + new FlinkImageBuilder() + .setConfiguration(jobManagerConf) + .setLogProperties(logProperties) + .asJobManager() + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build JobManager image", e); + } + final GenericContainer<?> jobManager = buildContainer(jobManagerImage); + // Setup network for JobManager + jobManager + .withNetworkAliases(FlinkContainers.JOB_MANAGER_HOSTNAME) + .withExposedPorts(jobManagerConf.get(RestOptions.PORT)); + // Setup logger + if (this.logger != null) { + jobManager.withLogConsumer(new Slf4jLogConsumer(this.logger).withPrefix("JobManager")); + } + return jobManager; + } + + private List<GenericContainer<?>> buildTaskManagerContainers() { + List<GenericContainer<?>> taskManagers = new ArrayList<>(); + for (int i = 0; i < numTaskManagers; i++) { + // Configure TaskManager + final Configuration taskManagerConf = new Configuration(); + taskManagerConf.addAll(this.conf); + final String taskManagerHostName = FlinkContainers.TASK_MANAGER_HOSTNAME_PREFIX + i; + taskManagerConf.set(TaskManagerOptions.HOST, taskManagerHostName); + // Build TaskManager container + final ImageFromDockerfile taskManagerImage; + try { + taskManagerImage = + new FlinkImageBuilder() + .setConfiguration(taskManagerConf) + .setLogProperties(logProperties) + .asTaskManager() + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build TaskManager image", e); + } + final GenericContainer<?> taskManager = buildContainer(taskManagerImage); + // Setup network for TaskManager + taskManager.withNetworkAliases(taskManagerHostName); + // Setup logger + if (this.logger != null) { + taskManager.withLogConsumer( + new Slf4jLogConsumer(this.logger).withPrefix("TaskManager-" + i)); + } + taskManagers.add(taskManager); + } + return taskManagers; + } + + private GenericContainer<?> buildZookeeperContainer() { + final ImageFromDockerfile zookeeperImage; + try { + zookeeperImage = + new FlinkImageBuilder() + .setConfiguration(this.conf) + .useCustomStartupCommand( + "flink/bin/zookeeper.sh start-foreground 1 && tail -f /dev/null") + .setImageName("flink-zookeeper") + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build Zookeeper image", e); + } + final GenericContainer<?> zookeeper = buildContainer(zookeeperImage); + // Setup network for Zookeeper + zookeeper.withNetworkAliases(FlinkContainers.ZOOKEEPER_HOSTNAME); + // Setup logger + if (this.logger != null) { + zookeeper.withLogConsumer(new Slf4jLogConsumer(this.logger).withPrefix("Zookeeper")); + } + return zookeeper; + } + + private GenericContainer<?> buildContainer(ImageFromDockerfile image) { + final GenericContainer<?> container = new GenericContainer<>(image); + for (GenericContainer<?> dependentContainer : dependentContainers) { + container.dependsOn(dependentContainer); + } + // Bind network to container + container.withNetwork(this.network); + // Add environment variables + container.withEnv(envVars); + return container; + } + + private void enableZookeeperHAConfigurations() { + // Set HA related configurations + checkNotNull(this.conf, "Configuration should not be null for setting HA service"); + conf.set(HighAvailabilityOptions.HA_MODE, "zookeeper"); + conf.set( + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, + FlinkContainers.ZOOKEEPER_HOSTNAME + ":" + "2181"); + conf.set(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "/flink"); + conf.set(HighAvailabilityOptions.HA_CLUSTER_ID, "flink-container-" + UUID.randomUUID()); + conf.set( + HighAvailabilityOptions.HA_STORAGE_PATH, + FlinkContainers.HA_STORAGE_PATH.toUri().toString()); + } + + private void createTempDirAndMountToContainer( + String tempDirPrefix, Path containerPath, GenericContainer<?> container) { + try { + Path tempDirPath = Files.createTempDirectory(tempDirPrefix); Review comment: Do we clean up this directory? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml ########## @@ -146,6 +146,12 @@ under the License. <version>${project.version}</version> <type>test-jar</type> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka</artifactId> + <version>${project.version}</version> Review comment: Making it a separate commit and explaining why we need it should suffice. Did we fix that the tests are now executed on CI? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; + +import org.slf4j.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A builder class for {@link FlinkContainers}. */ +public class FlinkContainersBuilder { + + private final List<GenericContainer<?>> dependentContainers = new ArrayList<>(); + private final Configuration conf = new Configuration(); + private final Map<String, String> envVars = new HashMap<>(); + private final Properties logProperties = new Properties(); + + private int numTaskManagers = 1; + private Network network = Network.newNetwork(); + private Logger logger; + private boolean enableZookeeperHA = false; + + /** Sets number of TaskManagers. */ + public FlinkContainersBuilder setNumTaskManagers(int numTaskManagers) { + this.numTaskManagers = numTaskManagers; + return this; + } + + /** Sets configuration of the cluster. */ + public FlinkContainersBuilder setConfiguration(Configuration conf) { + this.conf.addAll(conf); + return this; + } + + /** + * Lets Flink cluster depending on another container, and bind the network of Flink cluster to + * the dependent one. + */ + public FlinkContainersBuilder dependsOn(GenericContainer<?> container) { + container.withNetwork(this.network); + this.dependentContainers.add(container); + return this; + } + + /** Sets environment variable to containers. */ + public FlinkContainersBuilder setEnvironmentVariable(String key, String value) { + this.envVars.put(key, value); + return this; + } + + /** Sets network of the Flink cluster. */ + public FlinkContainersBuilder setNetwork(Network network) { + this.network = network; + return this; + } + + /** Sets a logger to the cluster in order to consume STDOUT of containers to the logger. */ + public FlinkContainersBuilder setLogger(Logger logger) { + this.logger = logger; + return this; + } + + /** + * Sets log4j property. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkContainersBuilder setLogProperty(String key, String value) { + this.logProperties.setProperty(key, value); + return this; + } + + /** + * Sets log4j properties. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkContainersBuilder setLogProperties(Properties logProperties) { + this.logProperties.putAll(logProperties); + return this; + } + + /** Enables high availability service. */ + public FlinkContainersBuilder enableZookeeperHA() { + this.enableZookeeperHA = true; + return this; + } + + /** Builds {@link FlinkContainers}. */ + public FlinkContainers build() { + GenericContainer<?> zookeeper = null; + if (enableZookeeperHA) { + enableZookeeperHAConfigurations(); + zookeeper = buildZookeeperContainer(); + } + + this.conf.set(JobManagerOptions.ADDRESS, FlinkContainers.JOB_MANAGER_HOSTNAME); + this.conf.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + FlinkContainers.CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); + + final GenericContainer<?> jobManager = buildJobManagerContainer(); + final List<GenericContainer<?>> taskManagers = buildTaskManagerContainers(); + if (enableZookeeperHA) { + createTempDirAndMountToContainer( + "flink-recovery", FlinkContainers.HA_STORAGE_PATH, jobManager); + } + createTempDirAndMountToContainer( + "flink-checkpoint", FlinkContainers.CHECKPOINT_PATH, jobManager); + return new FlinkContainers(jobManager, taskManagers, zookeeper, conf); + } + + // --------------------------- Helper Functions ------------------------------------- + + private GenericContainer<?> buildJobManagerContainer() { + // Configure JobManager + final Configuration jobManagerConf = new Configuration(); + jobManagerConf.addAll(this.conf); + // Build JobManager container + final ImageFromDockerfile jobManagerImage; + try { + jobManagerImage = + new FlinkImageBuilder() + .setConfiguration(jobManagerConf) + .setLogProperties(logProperties) + .asJobManager() + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build JobManager image", e); + } + final GenericContainer<?> jobManager = buildContainer(jobManagerImage); + // Setup network for JobManager + jobManager + .withNetworkAliases(FlinkContainers.JOB_MANAGER_HOSTNAME) + .withExposedPorts(jobManagerConf.get(RestOptions.PORT)); + // Setup logger + if (this.logger != null) { + jobManager.withLogConsumer(new Slf4jLogConsumer(this.logger).withPrefix("JobManager")); + } + return jobManager; + } + + private List<GenericContainer<?>> buildTaskManagerContainers() { + List<GenericContainer<?>> taskManagers = new ArrayList<>(); + for (int i = 0; i < numTaskManagers; i++) { + // Configure TaskManager + final Configuration taskManagerConf = new Configuration(); + taskManagerConf.addAll(this.conf); + final String taskManagerHostName = FlinkContainers.TASK_MANAGER_HOSTNAME_PREFIX + i; + taskManagerConf.set(TaskManagerOptions.HOST, taskManagerHostName); + // Build TaskManager container + final ImageFromDockerfile taskManagerImage; + try { + taskManagerImage = + new FlinkImageBuilder() + .setConfiguration(taskManagerConf) + .setLogProperties(logProperties) + .asTaskManager() + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build TaskManager image", e); + } + final GenericContainer<?> taskManager = buildContainer(taskManagerImage); + // Setup network for TaskManager + taskManager.withNetworkAliases(taskManagerHostName); + // Setup logger + if (this.logger != null) { + taskManager.withLogConsumer( + new Slf4jLogConsumer(this.logger).withPrefix("TaskManager-" + i)); + } + taskManagers.add(taskManager); Review comment: Nit: This almost equal to the container creation of the JM can you extract a common method? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; + +import org.slf4j.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A builder class for {@link FlinkContainers}. */ +public class FlinkContainersBuilder { + + private final List<GenericContainer<?>> dependentContainers = new ArrayList<>(); + private final Configuration conf = new Configuration(); + private final Map<String, String> envVars = new HashMap<>(); + private final Properties logProperties = new Properties(); + + private int numTaskManagers = 1; + private Network network = Network.newNetwork(); + private Logger logger; + private boolean enableZookeeperHA = false; + + /** Sets number of TaskManagers. */ + public FlinkContainersBuilder setNumTaskManagers(int numTaskManagers) { + this.numTaskManagers = numTaskManagers; + return this; + } + + /** Sets configuration of the cluster. */ + public FlinkContainersBuilder setConfiguration(Configuration conf) { + this.conf.addAll(conf); + return this; + } + + /** + * Lets Flink cluster depending on another container, and bind the network of Flink cluster to + * the dependent one. + */ + public FlinkContainersBuilder dependsOn(GenericContainer<?> container) { + container.withNetwork(this.network); + this.dependentContainers.add(container); + return this; + } + + /** Sets environment variable to containers. */ + public FlinkContainersBuilder setEnvironmentVariable(String key, String value) { + this.envVars.put(key, value); + return this; + } + + /** Sets network of the Flink cluster. */ + public FlinkContainersBuilder setNetwork(Network network) { + this.network = network; + return this; + } + + /** Sets a logger to the cluster in order to consume STDOUT of containers to the logger. */ + public FlinkContainersBuilder setLogger(Logger logger) { + this.logger = logger; + return this; + } + + /** + * Sets log4j property. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkContainersBuilder setLogProperty(String key, String value) { + this.logProperties.setProperty(key, value); + return this; + } + + /** + * Sets log4j properties. + * + * <p>Containers will use "log4j-console.properties" under flink-dist as the base configuration + * of loggers. Properties specified by this method will be appended to the config file, or + * overwrite the property if already exists in the base config file. + */ + public FlinkContainersBuilder setLogProperties(Properties logProperties) { + this.logProperties.putAll(logProperties); + return this; + } + + /** Enables high availability service. */ + public FlinkContainersBuilder enableZookeeperHA() { + this.enableZookeeperHA = true; + return this; + } + + /** Builds {@link FlinkContainers}. */ + public FlinkContainers build() { + GenericContainer<?> zookeeper = null; + if (enableZookeeperHA) { + enableZookeeperHAConfigurations(); + zookeeper = buildZookeeperContainer(); + } + + this.conf.set(JobManagerOptions.ADDRESS, FlinkContainers.JOB_MANAGER_HOSTNAME); + this.conf.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + FlinkContainers.CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); + + final GenericContainer<?> jobManager = buildJobManagerContainer(); + final List<GenericContainer<?>> taskManagers = buildTaskManagerContainers(); + if (enableZookeeperHA) { + createTempDirAndMountToContainer( + "flink-recovery", FlinkContainers.HA_STORAGE_PATH, jobManager); + } + createTempDirAndMountToContainer( + "flink-checkpoint", FlinkContainers.CHECKPOINT_PATH, jobManager); + return new FlinkContainers(jobManager, taskManagers, zookeeper, conf); + } + + // --------------------------- Helper Functions ------------------------------------- + + private GenericContainer<?> buildJobManagerContainer() { + // Configure JobManager + final Configuration jobManagerConf = new Configuration(); + jobManagerConf.addAll(this.conf); + // Build JobManager container + final ImageFromDockerfile jobManagerImage; + try { + jobManagerImage = + new FlinkImageBuilder() + .setConfiguration(jobManagerConf) + .setLogProperties(logProperties) + .asJobManager() + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build JobManager image", e); + } + final GenericContainer<?> jobManager = buildContainer(jobManagerImage); + // Setup network for JobManager + jobManager + .withNetworkAliases(FlinkContainers.JOB_MANAGER_HOSTNAME) + .withExposedPorts(jobManagerConf.get(RestOptions.PORT)); + // Setup logger + if (this.logger != null) { + jobManager.withLogConsumer(new Slf4jLogConsumer(this.logger).withPrefix("JobManager")); + } + return jobManager; + } + + private List<GenericContainer<?>> buildTaskManagerContainers() { + List<GenericContainer<?>> taskManagers = new ArrayList<>(); + for (int i = 0; i < numTaskManagers; i++) { + // Configure TaskManager + final Configuration taskManagerConf = new Configuration(); + taskManagerConf.addAll(this.conf); + final String taskManagerHostName = FlinkContainers.TASK_MANAGER_HOSTNAME_PREFIX + i; + taskManagerConf.set(TaskManagerOptions.HOST, taskManagerHostName); + // Build TaskManager container + final ImageFromDockerfile taskManagerImage; + try { + taskManagerImage = + new FlinkImageBuilder() + .setConfiguration(taskManagerConf) + .setLogProperties(logProperties) + .asTaskManager() + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to build TaskManager image", e); + } + final GenericContainer<?> taskManager = buildContainer(taskManagerImage); + // Setup network for TaskManager + taskManager.withNetworkAliases(taskManagerHostName); + // Setup logger + if (this.logger != null) { + taskManager.withLogConsumer( + new Slf4jLogConsumer(this.logger).withPrefix("TaskManager-" + i)); + } + taskManagers.add(taskManager); + } + return taskManagers; + } + + private GenericContainer<?> buildZookeeperContainer() { + final ImageFromDockerfile zookeeperImage; + try { + zookeeperImage = + new FlinkImageBuilder() + .setConfiguration(this.conf) + .useCustomStartupCommand( + "flink/bin/zookeeper.sh start-foreground 1 && tail -f /dev/null") + .setImageName("flink-zookeeper") + .build(); Review comment: Nit: Can we extract this to its own class and extend the `GenericContainer` I think we should also use an official zookeeper image and put the image name in `DockerImageVersions`. AFAIK Flink only supports a specific set of zookeeper versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
