fapaul commented on a change in pull request #17892:
URL: https://github.com/apache/flink/pull/17892#discussion_r756728229



##########
File path: flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
##########
@@ -146,6 +146,12 @@ under the License.
                        <version>${project.version}</version>
                        <type>test-jar</type>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka</artifactId>
+                       <version>${project.version}</version>

Review comment:
       Why do we need this dependency now?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainerBuilder.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for {@link FlinkContainer}. */
+public class FlinkContainerBuilder {
+
+    private final List<GenericContainer<?>> dependentContainers = new 
ArrayList<>();
+    private final Configuration conf = new Configuration();
+    private final Map<String, String> envVars = new HashMap<>();
+
+    private int numTaskManagers = 1;
+    private Network network = Network.newNetwork();
+    private Logger logger;
+    private boolean enableHAService = false;
+
+    /** Sets number of TaskManagers. */
+    public FlinkContainerBuilder numTaskManagers(int numTaskManagers) {
+        this.numTaskManagers = numTaskManagers;
+        return this;
+    }
+
+    /** Sets configuration of the cluster. */
+    public FlinkContainerBuilder setConfiguration(Configuration conf) {
+        this.conf.addAll(conf);
+        return this;
+    }
+
+    /**
+     * Lets Flink cluster depending on another container, and bind the network 
of Flink cluster to
+     * the dependent one.
+     */
+    public FlinkContainerBuilder dependsOn(GenericContainer<?> container) {
+        container.withNetwork(this.network);
+        this.dependentContainers.add(container);
+        return this;
+    }
+
+    /** Sets environment variable to containers. */
+    public FlinkContainerBuilder setEnvironmentVariable(String key, String 
value) {
+        this.envVars.put(key, value);
+        return this;
+    }
+
+    /** Sets network of the Flink cluster. */
+    public FlinkContainerBuilder setNetwork(Network network) {
+        this.network = network;
+        return this;
+    }
+
+    /** Sets a logger to the cluster in order to consume STDOUT of containers 
to the logger. */
+    public FlinkContainerBuilder setLogger(Logger logger) {
+        this.logger = logger;
+        return this;
+    }
+
+    /** Enables high availability service. */
+    public FlinkContainerBuilder enableHAService() {
+        this.enableHAService = true;
+        return this;
+    }
+
+    /** Builds {@link FlinkContainer}. */
+    public FlinkContainer build() {
+        GenericContainer<?> haService = null;
+        if (enableHAService) {
+            enableHAServiceConfigurations();
+            haService = buildZookeeperContainer();
+        }
+
+        this.conf.set(JobManagerOptions.ADDRESS, 
FlinkContainer.JOB_MANAGER_HOSTNAME);
+        this.conf.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                
FlinkContainer.CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
+
+        final GenericContainer<?> jobManager = buildJobManagerContainer();
+        final List<GenericContainer<?>> taskManagers = 
buildTaskManagerContainers();
+
+        if (enableHAService) {
+            try {
+                Path recoveryPath = 
Files.createTempDirectory("flink-recovery");
+                jobManager.withFileSystemBind(
+                        recoveryPath.toAbsolutePath().toString(),
+                        
FlinkContainer.HA_STORAGE_PATH.toAbsolutePath().toString());
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to create temporary 
recovery directory", e);
+            }
+        }
+
+        try {
+            Path checkpointPath = 
Files.createTempDirectory("flink-checkpoint");
+            jobManager.withFileSystemBind(
+                    checkpointPath.toAbsolutePath().toString(),
+                    
FlinkContainer.CHECKPOINT_PATH.toAbsolutePath().toString());
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to create temporary 
checkpoint directory", e);
+        }
+
+        return new FlinkContainer(jobManager, taskManagers, haService, conf);
+    }
+
+    // --------------------------- Helper Functions 
-------------------------------------
+
+    private GenericContainer<?> buildJobManagerContainer() {
+        // Configure JobManager
+        final Configuration jobManagerConf = new Configuration();
+        jobManagerConf.addAll(this.conf);
+        // Build JobManager container
+        final ImageFromDockerfile jobManagerImage;
+        try {
+            jobManagerImage =
+                    new 
FlinkImageBuilder().setConfiguration(jobManagerConf).asJobManager().build();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to build JobManager image", e);
+        }
+        final GenericContainer<?> jobManager = buildContainer(jobManagerImage);
+        // Setup network for JobManager
+        jobManager
+                .withNetworkAliases(FlinkContainer.JOB_MANAGER_HOSTNAME)
+                .withExposedPorts(jobManagerConf.get(RestOptions.PORT));
+        // Setup logger
+        if (this.logger != null) {
+            jobManager.withLogConsumer(new 
Slf4jLogConsumer(this.logger).withPrefix("JobManager"));

Review comment:
       We recently experienced issues of excessive logging coming from the 
Kafka container because the log level of the capturing logger is not inferred 
in the containers. I think we need to do something similar. [1]
   
   [1] 
https://github.com/apache/flink/blob/249683b25655e1c8c17f3ac01e7739e558df1f9b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java#L57
   
   

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainerBuilder.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for {@link FlinkContainer}. */
+public class FlinkContainerBuilder {
+
+    private final List<GenericContainer<?>> dependentContainers = new 
ArrayList<>();
+    private final Configuration conf = new Configuration();
+    private final Map<String, String> envVars = new HashMap<>();
+
+    private int numTaskManagers = 1;
+    private Network network = Network.newNetwork();
+    private Logger logger;
+    private boolean enableHAService = false;
+
+    /** Sets number of TaskManagers. */
+    public FlinkContainerBuilder numTaskManagers(int numTaskManagers) {
+        this.numTaskManagers = numTaskManagers;
+        return this;
+    }
+
+    /** Sets configuration of the cluster. */
+    public FlinkContainerBuilder setConfiguration(Configuration conf) {
+        this.conf.addAll(conf);
+        return this;
+    }
+
+    /**
+     * Lets Flink cluster depending on another container, and bind the network 
of Flink cluster to
+     * the dependent one.
+     */
+    public FlinkContainerBuilder dependsOn(GenericContainer<?> container) {
+        container.withNetwork(this.network);
+        this.dependentContainers.add(container);
+        return this;
+    }
+
+    /** Sets environment variable to containers. */
+    public FlinkContainerBuilder setEnvironmentVariable(String key, String 
value) {
+        this.envVars.put(key, value);
+        return this;
+    }
+
+    /** Sets network of the Flink cluster. */
+    public FlinkContainerBuilder setNetwork(Network network) {
+        this.network = network;
+        return this;
+    }
+
+    /** Sets a logger to the cluster in order to consume STDOUT of containers 
to the logger. */
+    public FlinkContainerBuilder setLogger(Logger logger) {
+        this.logger = logger;
+        return this;
+    }
+
+    /** Enables high availability service. */
+    public FlinkContainerBuilder enableHAService() {

Review comment:
       Nit: Should we make it apparent that we use the `Zoopeeker HA`?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.tests.util.util.FileUtils;
+
+import com.github.dockerjava.api.exception.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for constructing Docker image based on flink-dist. */
+public class FlinkImageBuilder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkImageBuilder.class);
+    private static final String FLINK_BASE_IMAGE_NAME = "flink-dist-base";
+    private static final String DEFAULT_IMAGE_NAME = "flink-dist-configured";
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5);
+
+    private final Map<Path, Path> filesToCopy = new HashMap<>();
+
+    private String imageName = DEFAULT_IMAGE_NAME;
+    private String imageNameSuffix;
+    private Path flinkDist = FileUtils.findFlinkDist();
+    private String javaVersion;
+    private Configuration conf;
+    private Duration timeout = DEFAULT_TIMEOUT;
+    private String startupCommand;
+
+    /**
+     * Sets the name of building image.
+     *
+     * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be 
used.
+     */
+    public FlinkImageBuilder setImageName(String imageName) {
+        this.imageName = imageName;
+        return this;
+    }
+
+    /**
+     * Sets the path of flink-dist directory.
+     *
+     * <p>If path is not specified, the dist directory under current project 
will be used.
+     */
+    public FlinkImageBuilder setFlinkDistPath(Path flinkDist) {
+        this.flinkDist = flinkDist;
+        return this;
+    }
+
+    /**
+     * Sets JDK version in the image.
+     *
+     * <p>If version is not specified, the JDK version of the current JVM will 
be used.
+     */
+    public FlinkImageBuilder setJavaVersion(String javaVersion) {
+        this.javaVersion = javaVersion;
+        return this;
+    }
+
+    /**
+     * Sets Flink configuration. This configuration will be used for 
generating flink-conf.yaml for
+     * configuring JobManager and TaskManager.
+     */
+    public FlinkImageBuilder setConfiguration(Configuration conf) {
+        this.conf = conf;
+        return this;
+    }
+
+    /** Copies file into the image. */
+    public FlinkImageBuilder copyFile(Path localPath, Path containerPath) {
+        filesToCopy.put(localPath, containerPath);
+        return this;
+    }
+
+    /** Sets timeout for building the image. */
+    public FlinkImageBuilder setTimeout(Duration timeout) {
+        this.timeout = timeout;
+        return this;
+    }
+
+    /** Use this image for building a JobManager. */
+    public FlinkImageBuilder asJobManager() {
+        checkStartupCommandNotSet();
+        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && 
tail -f /dev/null";
+        this.imageNameSuffix = "jobmanager";
+        return this;
+    }
+
+    /** Use this image for building a TaskManager. */
+    public FlinkImageBuilder asTaskManager() {
+        checkStartupCommandNotSet();
+        this.startupCommand = "flink/bin/taskmanager.sh start-foreground && 
tail -f /dev/null";
+        this.imageNameSuffix = "taskmanager";
+        return this;
+    }
+
+    /** Use a custom command for starting up the container. */
+    public FlinkImageBuilder useCustomStartupCommand(String command) {
+        checkStartupCommandNotSet();
+        this.startupCommand = command;
+        this.imageNameSuffix = "custom";
+        return this;
+    }
+
+    /** Build the image. */
+    public ImageFromDockerfile build() throws TimeoutException, IOException {

Review comment:
       WDYT about simplifying the Exception handling by only throwing an 
`ImageBuildException` during runtime from this method?
   
   The exception can already contain if it is the taskmanager or jobmanager and 
none of the users needs to handle and wrap it again.

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
##########
@@ -70,13 +78,16 @@ public void tearDown() {
     @Override
     public StreamExecutionEnvironment createExecutionEnvironment() {
         return StreamExecutionEnvironment.createRemoteEnvironment(
-                this.flinkContainer.getHost(),
-                
this.flinkContainer.getMappedPort(FlinkContainer.JOB_MANAGER_REST_PORT),
+                this.flinkContainer.getJobManagerHost(),
+                this.flinkContainer.getJobManagerPort(),
                 this.jarPath);
     }
 
     @Override
-    public void triggerJobManagerFailover(JobClient jobClient, Runnable 
afterFailAction) {}
+    public void triggerJobManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws Exception {
+        flinkContainer.restartJobManager(afterFailAction::run);

Review comment:
       In the `triggerTaskManagerFailover` method we wait until the taskmanager 
was restarted, does it make sense to do the same here?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainerBuilder.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for {@link FlinkContainer}. */
+public class FlinkContainerBuilder {
+
+    private final List<GenericContainer<?>> dependentContainers = new 
ArrayList<>();
+    private final Configuration conf = new Configuration();
+    private final Map<String, String> envVars = new HashMap<>();
+
+    private int numTaskManagers = 1;
+    private Network network = Network.newNetwork();
+    private Logger logger;
+    private boolean enableHAService = false;
+
+    /** Sets number of TaskManagers. */
+    public FlinkContainerBuilder numTaskManagers(int numTaskManagers) {
+        this.numTaskManagers = numTaskManagers;
+        return this;
+    }
+
+    /** Sets configuration of the cluster. */
+    public FlinkContainerBuilder setConfiguration(Configuration conf) {
+        this.conf.addAll(conf);
+        return this;
+    }
+
+    /**
+     * Lets Flink cluster depending on another container, and bind the network 
of Flink cluster to
+     * the dependent one.
+     */
+    public FlinkContainerBuilder dependsOn(GenericContainer<?> container) {
+        container.withNetwork(this.network);
+        this.dependentContainers.add(container);
+        return this;
+    }
+
+    /** Sets environment variable to containers. */
+    public FlinkContainerBuilder setEnvironmentVariable(String key, String 
value) {
+        this.envVars.put(key, value);
+        return this;
+    }
+
+    /** Sets network of the Flink cluster. */
+    public FlinkContainerBuilder setNetwork(Network network) {
+        this.network = network;
+        return this;
+    }
+
+    /** Sets a logger to the cluster in order to consume STDOUT of containers 
to the logger. */
+    public FlinkContainerBuilder setLogger(Logger logger) {
+        this.logger = logger;
+        return this;
+    }
+
+    /** Enables high availability service. */
+    public FlinkContainerBuilder enableHAService() {
+        this.enableHAService = true;
+        return this;
+    }
+
+    /** Builds {@link FlinkContainer}. */
+    public FlinkContainer build() {
+        GenericContainer<?> haService = null;
+        if (enableHAService) {
+            enableHAServiceConfigurations();
+            haService = buildZookeeperContainer();
+        }
+
+        this.conf.set(JobManagerOptions.ADDRESS, 
FlinkContainer.JOB_MANAGER_HOSTNAME);
+        this.conf.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                
FlinkContainer.CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
+
+        final GenericContainer<?> jobManager = buildJobManagerContainer();
+        final List<GenericContainer<?>> taskManagers = 
buildTaskManagerContainers();
+
+        if (enableHAService) {
+            try {
+                Path recoveryPath = 
Files.createTempDirectory("flink-recovery");
+                jobManager.withFileSystemBind(
+                        recoveryPath.toAbsolutePath().toString(),
+                        
FlinkContainer.HA_STORAGE_PATH.toAbsolutePath().toString());
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to create temporary 
recovery directory", e);
+            }
+        }
+
+        try {
+            Path checkpointPath = 
Files.createTempDirectory("flink-checkpoint");
+            jobManager.withFileSystemBind(
+                    checkpointPath.toAbsolutePath().toString(),
+                    
FlinkContainer.CHECKPOINT_PATH.toAbsolutePath().toString());
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to create temporary 
checkpoint directory", e);
+        }

Review comment:
       Please extract a method to avoid the code duplication

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainer.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.tests.util.flink.SQLJobSubmission;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A Flink cluster running JM and TM on different containers. */
+public class FlinkContainer implements BeforeAllCallback, AfterAllCallback {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkContainer.class);
+
+    // Hostname of components within container network
+    public static final String JOB_MANAGER_HOSTNAME = "jobmanager";
+    public static final String TASK_MANAGER_HOSTNAME_PREFIX = "taskmanager-";
+    public static final String ZOOKEEPER_HOSTNAME = "zookeeper";
+
+    // Directories for storing states
+    public static final Path CHECKPOINT_PATH = Paths.get("/flink/checkpoint");
+    public static final Path HA_STORAGE_PATH = Paths.get("/flink/recovery");
+
+    private final GenericContainer<?> jobManager;
+    private final List<GenericContainer<?>> taskManagers;
+    private final GenericContainer<?> haService;
+    private final Configuration conf;
+
+    private RestClusterClient<StandaloneClusterId> restClusterClient;
+    private boolean isRunning;
+
+    /** Creates a builder for {@link FlinkContainer}. */
+    public static FlinkContainerBuilder builder() {
+        return new FlinkContainerBuilder();
+    }
+
+    FlinkContainer(
+            GenericContainer<?> jobManager,
+            List<GenericContainer<?>> taskManagers,
+            @Nullable GenericContainer<?> haService,
+            Configuration conf) {
+        this.jobManager = jobManager;
+        this.taskManagers = taskManagers;
+        this.haService = haService;
+        this.conf = conf;
+    }
+
+    /** Starts all containers. */
+    public void start() throws Exception {
+        if (haService != null) {
+            LOG.debug("Starting HA service container");
+            this.haService.start();
+        }
+        LOG.debug("Starting JobManager container");
+        this.jobManager.start();
+        LOG.debug("Starting TaskManager containers");
+        this.taskManagers.parallelStream().forEach(GenericContainer::start);
+        LOG.debug("Creating REST cluster client");
+        this.restClusterClient = createClusterClient();
+        waitUntilAllTaskManagerConnected();
+        isRunning = true;
+    }
+
+    /** Stops all containers. */
+    public void stop() {
+        isRunning = false;
+        if (restClusterClient != null) {
+            restClusterClient.close();
+        }
+        this.taskManagers.forEach(GenericContainer::stop);
+        this.jobManager.stop();
+        if (this.haService != null) {
+            this.haService.stop();
+        }
+    }
+
+    /** Gets the running state of the cluster. */
+    public boolean isRunning() {

Review comment:
       Nit: This method can be misleading because it only depicts that the 
containers were started once but does not reflect the current situation. I 
guess we do not need to change now and treat it as a follow improvement.

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainer.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.tests.util.flink.SQLJobSubmission;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A Flink cluster running JM and TM on different containers. */
+public class FlinkContainer implements BeforeAllCallback, AfterAllCallback {

Review comment:
       I wonder whether `FlinkContainer` is still the correct name because it 
now consists of multiple containers. WDYT of renaming it to `FlinkService`?
   
   

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.tests.util.util.FileUtils;
+
+import com.github.dockerjava.api.exception.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for constructing Docker image based on flink-dist. */
+public class FlinkImageBuilder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkImageBuilder.class);
+    private static final String FLINK_BASE_IMAGE_NAME = "flink-dist-base";
+    private static final String DEFAULT_IMAGE_NAME = "flink-dist-configured";
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5);
+
+    private final Map<Path, Path> filesToCopy = new HashMap<>();
+
+    private String imageName = DEFAULT_IMAGE_NAME;
+    private String imageNameSuffix;
+    private Path flinkDist = FileUtils.findFlinkDist();
+    private String javaVersion;
+    private Configuration conf;
+    private Duration timeout = DEFAULT_TIMEOUT;
+    private String startupCommand;
+
+    /**
+     * Sets the name of building image.
+     *
+     * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be 
used.
+     */
+    public FlinkImageBuilder setImageName(String imageName) {
+        this.imageName = imageName;
+        return this;
+    }
+
+    /**
+     * Sets the path of flink-dist directory.
+     *
+     * <p>If path is not specified, the dist directory under current project 
will be used.
+     */
+    public FlinkImageBuilder setFlinkDistPath(Path flinkDist) {
+        this.flinkDist = flinkDist;
+        return this;
+    }
+
+    /**
+     * Sets JDK version in the image.
+     *
+     * <p>If version is not specified, the JDK version of the current JVM will 
be used.
+     */
+    public FlinkImageBuilder setJavaVersion(String javaVersion) {
+        this.javaVersion = javaVersion;
+        return this;
+    }
+
+    /**
+     * Sets Flink configuration. This configuration will be used for 
generating flink-conf.yaml for
+     * configuring JobManager and TaskManager.
+     */
+    public FlinkImageBuilder setConfiguration(Configuration conf) {
+        this.conf = conf;
+        return this;
+    }
+
+    /** Copies file into the image. */
+    public FlinkImageBuilder copyFile(Path localPath, Path containerPath) {
+        filesToCopy.put(localPath, containerPath);
+        return this;
+    }
+
+    /** Sets timeout for building the image. */
+    public FlinkImageBuilder setTimeout(Duration timeout) {
+        this.timeout = timeout;
+        return this;
+    }
+
+    /** Use this image for building a JobManager. */
+    public FlinkImageBuilder asJobManager() {
+        checkStartupCommandNotSet();
+        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && 
tail -f /dev/null";
+        this.imageNameSuffix = "jobmanager";
+        return this;
+    }
+
+    /** Use this image for building a TaskManager. */
+    public FlinkImageBuilder asTaskManager() {
+        checkStartupCommandNotSet();
+        this.startupCommand = "flink/bin/taskmanager.sh start-foreground && 
tail -f /dev/null";
+        this.imageNameSuffix = "taskmanager";
+        return this;
+    }
+
+    /** Use a custom command for starting up the container. */
+    public FlinkImageBuilder useCustomStartupCommand(String command) {
+        checkStartupCommandNotSet();
+        this.startupCommand = command;
+        this.imageNameSuffix = "custom";
+        return this;
+    }
+
+    /** Build the image. */
+    public ImageFromDockerfile build() throws TimeoutException, IOException {
+        sanityCheck();
+        // Build base image first
+        buildBaseImage(flinkDist);
+        final Path flinkConfFile = createTemporaryFlinkConfFile();
+        // Copy flink-conf.yaml into image
+        filesToCopy.put(
+                flinkConfFile, Paths.get("flink", "conf", 
GlobalConfiguration.FLINK_CONF_FILENAME));
+
+        final ImageFromDockerfile image =
+                new ImageFromDockerfile(imageName + "-" + imageNameSuffix)
+                        .withDockerfileFromBuilder(
+                                builder -> {
+                                    // Build from base image
+                                    builder.from(FLINK_BASE_IMAGE_NAME);
+                                    // Copy files into image
+                                    filesToCopy.forEach(
+                                            (from, to) ->
+                                                    
builder.copy(to.toString(), to.toString()));
+                                    builder.cmd(startupCommand);
+                                });
+        filesToCopy.forEach((from, to) -> 
image.withFileFromPath(to.toString(), from));
+        return image;
+    }
+
+    // ----------------------- Helper functions -----------------------
+
+    private void buildBaseImage(Path flinkDist) throws TimeoutException {
+        if (!baseImageExists()) {
+            LOG.info("Building Flink base image with flink-dist at {}", 
flinkDist);
+            new ImageFromDockerfile(FLINK_BASE_IMAGE_NAME)
+                    .withDockerfileFromBuilder(
+                            builder ->
+                                    builder.from("openjdk:" + 
getJavaVersionSuffix())
+                                            .copy("flink", "flink")
+                                            .build())
+                    .withFileFromPath("flink", flinkDist)
+                    .get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private boolean baseImageExists() {
+        try {
+            
DockerClientFactory.instance().client().inspectImageCmd(FLINK_BASE_IMAGE_NAME).exec();
+            return true;
+        } catch (NotFoundException e) {
+            return false;
+        }
+    }
+
+    private String getJavaVersionSuffix() {
+        if (this.javaVersion != null) {
+            return this.javaVersion;
+        } else {
+            String javaSpecVersion = 
System.getProperty("java.vm.specification.version");
+            LOG.info("Using JDK version {} of the current VM", 
javaSpecVersion);
+            switch (javaSpecVersion) {
+                case "1.8":
+                    return "8";
+                case "11":
+                    return "11";
+                default:
+                    throw new IllegalStateException("Unexpected Java version: 
" + javaSpecVersion);
+            }
+        }
+    }
+
+    private Path createTemporaryFlinkConfFile() throws IOException {
+        final Path tempDirectory =
+                Files.createTempDirectory("flink-dist-build-" + 
UUID.randomUUID());
+
+        // Load Flink configurations in flink-dist
+        final Configuration finalConfiguration =
+                GlobalConfiguration.loadConfiguration(
+                        flinkDist.resolve("conf").toAbsolutePath().toString());
+
+        if (this.conf != null) {
+            finalConfiguration.addAll(this.conf);
+        }
+
+        Path flinkConfFile = 
tempDirectory.resolve(GlobalConfiguration.FLINK_CONF_FILENAME);
+
+        Files.write(
+                flinkConfFile,
+                finalConfiguration.toMap().entrySet().stream()
+                        .map(entry -> entry.getKey() + ": " + entry.getValue())
+                        .collect(Collectors.toList()));
+
+        return flinkConfFile;
+    }
+
+    private void checkStartupCommandNotSet() {
+        if (this.startupCommand != null) {
+            throw new IllegalStateException("Cannot set startup command for 
multiple times");

Review comment:
       Nit: 
   ```suggestion
               throw new IllegalStateException("Cannot set startup command of 
container multiple times");
   ```

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.tests.util.util.FileUtils;
+
+import com.github.dockerjava.api.exception.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for constructing Docker image based on flink-dist. */
+public class FlinkImageBuilder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkImageBuilder.class);
+    private static final String FLINK_BASE_IMAGE_NAME = "flink-dist-base";
+    private static final String DEFAULT_IMAGE_NAME = "flink-dist-configured";
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5);
+
+    private final Map<Path, Path> filesToCopy = new HashMap<>();
+
+    private String imageName = DEFAULT_IMAGE_NAME;
+    private String imageNameSuffix;
+    private Path flinkDist = FileUtils.findFlinkDist();
+    private String javaVersion;
+    private Configuration conf;
+    private Duration timeout = DEFAULT_TIMEOUT;
+    private String startupCommand;
+
+    /**
+     * Sets the name of building image.
+     *
+     * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be 
used.
+     */
+    public FlinkImageBuilder setImageName(String imageName) {
+        this.imageName = imageName;
+        return this;
+    }
+
+    /**
+     * Sets the path of flink-dist directory.
+     *
+     * <p>If path is not specified, the dist directory under current project 
will be used.
+     */
+    public FlinkImageBuilder setFlinkDistPath(Path flinkDist) {
+        this.flinkDist = flinkDist;
+        return this;
+    }
+
+    /**
+     * Sets JDK version in the image.
+     *
+     * <p>If version is not specified, the JDK version of the current JVM will 
be used.

Review comment:
       Nit: you can add a link to the docker hub of openjdk because the version 
format is the tag of the OpenJDK image.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to