fapaul commented on a change in pull request #17892: URL: https://github.com/apache/flink/pull/17892#discussion_r764972218
########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.Extension; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A Flink cluster running JM and TMs on containers. + * + * <p>This containerized Flink cluster is based on <a + * href="https://www.testcontainers.org/">Testcontainers</a>, which simulates a truly distributed + * environment for E2E tests. This class can also be used as an {@link Extension} of JUnit 5 so that + * the lifecycle of the cluster can be easily managed by JUnit Jupiter engine. + * + * <h2>Example usage</h2> + * + * <pre>{@code + * public class E2ETest { + * // Create a Flink cluster using default configurations. + * // Remember to declare it as "static" as required by JUnit 5. + * @RegisterExtension + * static FlinkContainers flink = FlinkContainers.builder().build(); + * + * // To work together with other containers + * @RegisterExtension + * static FlinkContainers flink = + * FlinkContainers.builder() + * .dependsOn(kafkaContainer) + * .build(); + * + * // Customize a Flink cluster + * // Remember to declare it as "static" as required by JUnit 5. + * @RegisterExtension + * static FlinkContainers flink = + * FlinkContainers.builder() + * .setNumTaskManagers(3) + * .setConfiguration(TaskManagerOptions.NUM_TASK_SLOTS, 6) + * .setLogger(LoggerFactory.getLogger(E2ETest.class)) + * .enableZookeeperHA() + * .build(); + * ... + * } + * }</pre> + * + * <p>Detailed usages can be found in the JavaDoc of {@link FlinkContainersBuilder}. + * + * <h2>Prerequisites</h2> + * + * <p>Docker environment is required in the running machine since this class is based on + * Testcontainers. + * + * <p>Make sure you have an already-built flink-dist either under the current project, or specify + * the path manually in builder. + */ +public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class); + + // Default timeout of operations + public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + + private final GenericContainer<?> jobManager; + private final List<GenericContainer<?>> taskManagers; + private final GenericContainer<?> haService; + private final Configuration conf; + private final Runnable cleanupHook; + + private RestClusterClient<StandaloneClusterId> restClusterClient; + private boolean isStarted; + + /** Creates a builder for {@link FlinkContainers}. */ + public static FlinkContainersBuilder builder() { + return new FlinkContainersBuilder(); + } + + FlinkContainers( + GenericContainer<?> jobManager, + List<GenericContainer<?>> taskManagers, + @Nullable GenericContainer<?> haService, + Configuration conf, + Runnable cleanupHook) { + this.jobManager = jobManager; + this.taskManagers = taskManagers; + this.haService = haService; + this.conf = conf; + this.cleanupHook = cleanupHook; + } + + /** Starts all containers. */ + public void start() throws Exception { + if (haService != null) { + LOG.debug("Starting HA service container"); + this.haService.start(); + } + LOG.debug("Starting JobManager container"); + this.jobManager.start(); + waitUntilJobManagerRESTReachable(jobManager); + LOG.debug("Starting TaskManager containers"); + this.taskManagers.parallelStream().forEach(GenericContainer::start); + LOG.debug("Creating REST cluster client"); + this.restClusterClient = createClusterClient(); + waitUntilAllTaskManagerConnected(); + isStarted = true; + } + + /** Stops all containers. */ + public void stop() { + isStarted = false; + if (restClusterClient != null) { + restClusterClient.close(); + } + this.taskManagers.forEach(GenericContainer::stop); + this.jobManager.stop(); + if (this.haService != null) { + this.haService.stop(); + } + cleanupHook.run(); + } + + /** Gets the running state of the cluster. */ + public boolean isStarted() { + return isStarted; + } + + /** Gets JobManager container. */ + public GenericContainer<?> getJobManager() { + return this.jobManager; + } + + /** Gets JobManager's hostname on the host machine. */ + public String getJobManagerHost() { + return jobManager.getHost(); + } + + /** Gets JobManager's port on the host machine. */ + public int getJobManagerPort() { + return jobManager.getMappedPort(this.conf.get(RestOptions.PORT)); + } + + /** Gets REST client connected to JobManager. */ + public RestClusterClient<StandaloneClusterId> getRestClusterClient() { + return this.restClusterClient; + } + + /** + * Restarts JobManager container. + * + * <p>Note that the REST port will be changed because the new JM container will be mapped to + * another random port. Please make sure to get the REST cluster client again after this method + * is invoked. + */ + public void restartJobManager(RunnableWithException afterFailAction) throws Exception { + if (this.haService == null) { + LOG.warn( + "Restarting JobManager without HA service. This might drop all your running jobs"); + } + jobManager.stop(); + afterFailAction.run(); + jobManager.start(); + // Recreate client because JobManager REST port might have been changed in new container + waitUntilJobManagerRESTReachable(jobManager); + this.restClusterClient = createClusterClient(); Review comment: AFAIK you need to close the old `restClusterClient` to prevent leaking resources. ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.Extension; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A Flink cluster running JM and TMs on containers. + * + * <p>This containerized Flink cluster is based on <a + * href="https://www.testcontainers.org/">Testcontainers</a>, which simulates a truly distributed + * environment for E2E tests. This class can also be used as an {@link Extension} of JUnit 5 so that + * the lifecycle of the cluster can be easily managed by JUnit Jupiter engine. + * + * <h2>Example usage</h2> + * + * <pre>{@code + * public class E2ETest { + * // Create a Flink cluster using default configurations. + * // Remember to declare it as "static" as required by JUnit 5. + * @RegisterExtension + * static FlinkContainers flink = FlinkContainers.builder().build(); + * + * // To work together with other containers + * @RegisterExtension + * static FlinkContainers flink = + * FlinkContainers.builder() + * .dependsOn(kafkaContainer) + * .build(); + * + * // Customize a Flink cluster + * // Remember to declare it as "static" as required by JUnit 5. + * @RegisterExtension + * static FlinkContainers flink = + * FlinkContainers.builder() + * .setNumTaskManagers(3) + * .setConfiguration(TaskManagerOptions.NUM_TASK_SLOTS, 6) + * .setLogger(LoggerFactory.getLogger(E2ETest.class)) + * .enableZookeeperHA() + * .build(); + * ... + * } + * }</pre> + * + * <p>Detailed usages can be found in the JavaDoc of {@link FlinkContainersBuilder}. + * + * <h2>Prerequisites</h2> + * + * <p>Docker environment is required in the running machine since this class is based on + * Testcontainers. + * + * <p>Make sure you have an already-built flink-dist either under the current project, or specify + * the path manually in builder. + */ +public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class); + + // Default timeout of operations + public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + + private final GenericContainer<?> jobManager; + private final List<GenericContainer<?>> taskManagers; + private final GenericContainer<?> haService; + private final Configuration conf; + private final Runnable cleanupHook; + + private RestClusterClient<StandaloneClusterId> restClusterClient; + private boolean isStarted; Review comment: Nit: Make default to false; ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/ImageBuildException.java ########## @@ -0,0 +1,18 @@ +package org.apache.flink.tests.util.flink.container; + +/** Exception indicating errors when building Flink image. */ +public class ImageBuildException extends Exception { + public ImageBuildException() {} + + public ImageBuildException(String message) { + super(message); + } + + public ImageBuildException(String message, Throwable cause) { + super(message, cause); + } + + public ImageBuildException(Throwable cause) { + super(cause); + } Review comment: Do we need all these constructors? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.Extension; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A Flink cluster running JM and TMs on containers. + * + * <p>This containerized Flink cluster is based on <a + * href="https://www.testcontainers.org/">Testcontainers</a>, which simulates a truly distributed + * environment for E2E tests. This class can also be used as an {@link Extension} of JUnit 5 so that + * the lifecycle of the cluster can be easily managed by JUnit Jupiter engine. + * + * <h2>Example usage</h2> + * + * <pre>{@code + * public class E2ETest { + * // Create a Flink cluster using default configurations. + * // Remember to declare it as "static" as required by JUnit 5. + * @RegisterExtension + * static FlinkContainers flink = FlinkContainers.builder().build(); + * + * // To work together with other containers + * @RegisterExtension + * static FlinkContainers flink = + * FlinkContainers.builder() + * .dependsOn(kafkaContainer) + * .build(); + * + * // Customize a Flink cluster + * // Remember to declare it as "static" as required by JUnit 5. + * @RegisterExtension + * static FlinkContainers flink = + * FlinkContainers.builder() + * .setNumTaskManagers(3) + * .setConfiguration(TaskManagerOptions.NUM_TASK_SLOTS, 6) + * .setLogger(LoggerFactory.getLogger(E2ETest.class)) + * .enableZookeeperHA() + * .build(); + * ... + * } + * }</pre> + * + * <p>Detailed usages can be found in the JavaDoc of {@link FlinkContainersBuilder}. + * + * <h2>Prerequisites</h2> + * + * <p>Docker environment is required in the running machine since this class is based on + * Testcontainers. + * + * <p>Make sure you have an already-built flink-dist either under the current project, or specify + * the path manually in builder. + */ +public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class); + + // Default timeout of operations + public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + + private final GenericContainer<?> jobManager; + private final List<GenericContainer<?>> taskManagers; + private final GenericContainer<?> haService; + private final Configuration conf; + private final Runnable cleanupHook; + + private RestClusterClient<StandaloneClusterId> restClusterClient; Review comment: Add `@Nullable` ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink.container; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A builder class for {@link FlinkContainers}. */ +public class FlinkContainersBuilder { + + // Hostname of components within container network + public static final String JOB_MANAGER_HOSTNAME = "jobmanager"; + public static final String TASK_MANAGER_HOSTNAME_PREFIX = "taskmanager-"; + public static final String ZOOKEEPER_HOSTNAME = "zookeeper"; + + // Directories for storing states + public static final Path CHECKPOINT_PATH = Paths.get("/flink/checkpoint"); + public static final Path HA_STORAGE_PATH = Paths.get("/flink/recovery"); Review comment: Can all be private -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
