This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch exp_github_actions in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b28af2783903d853b1547b6cc90ebaca5cd198a Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Sun Apr 3 09:52:56 2022 +0200 port gcp test to testcontainers --- .../pom.xml | 15 +- .../gcp/pubsub/emulator/GCloudEmulatorManager.java | 297 --------------------- .../gcp/pubsub/emulator/GCloudUnitTestBase.java | 25 +- 3 files changed, 18 insertions(+), 319 deletions(-) diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml index 1f8dccee879..b60113fc9ec 100644 --- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml +++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml @@ -59,14 +59,6 @@ under the License. <scope>test</scope> </dependency> - <!-- This is used to run the local PubSub --> - <dependency> - <groupId>com.spotify</groupId> - <artifactId>docker-client</artifactId> - <version>8.16.0</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> @@ -88,6 +80,13 @@ under the License. <artifactId>flink-test-utils-junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>gcloud</artifactId> + <version>1.16.3</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java deleted file mode 100644 index 9f37102f318..00000000000 --- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.gcp.pubsub.emulator; - -import com.spotify.docker.client.DefaultDockerClient; -import com.spotify.docker.client.DockerClient; -import com.spotify.docker.client.LogStream; -import com.spotify.docker.client.exceptions.ContainerNotFoundException; -import com.spotify.docker.client.exceptions.DockerCertificateException; -import com.spotify.docker.client.exceptions.DockerException; -import com.spotify.docker.client.exceptions.ImageNotFoundException; -import com.spotify.docker.client.messages.ContainerConfig; -import com.spotify.docker.client.messages.ContainerCreation; -import com.spotify.docker.client.messages.ContainerInfo; -import com.spotify.docker.client.messages.HostConfig; -import com.spotify.docker.client.messages.PortBinding; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** The class that handles the starting and stopping of the emulator docker image. */ -public class GCloudEmulatorManager { - - private static final Logger LOG = LoggerFactory.getLogger(GCloudEmulatorManager.class); - - private static DockerClient docker; - - private static String dockerIpAddress = "127.0.0.1"; - - public static final String INTERNAL_PUBSUB_PORT = "22222"; - // TODO: use :latest again once - // https://github.com/GoogleCloudPlatform/cloud-sdk-docker/issues/225 is resolved. - public static final String DOCKER_IMAGE_NAME = "google/cloud-sdk:313.0.0"; - - private static String pubsubPort; - - public static String getDockerIpAddress() { - if (dockerIpAddress == null) { - throw new IllegalStateException( - "The docker has not yet been started (yet) so you cannot get the IP address yet."); - } - return dockerIpAddress; - } - - public static String getDockerPubSubPort() { - if (pubsubPort == null) { - throw new IllegalStateException( - "The docker has not yet been started (yet) so you cannot get the port information yet."); - } - return pubsubPort; - } - - public static final String UNITTEST_PROJECT_ID = "running-from-junit-for-flink"; - private static final String CONTAINER_NAME_JUNIT = - (DOCKER_IMAGE_NAME + "_" + UNITTEST_PROJECT_ID).replaceAll("[^a-zA-Z0-9_]", "_"); - - public static void launchDocker() - throws DockerException, InterruptedException, DockerCertificateException { - // Create a client based on DOCKER_HOST and DOCKER_CERT_PATH env vars - docker = DefaultDockerClient.fromEnv().build(); - - terminateAndDiscardAnyExistingContainers(true); - - LOG.info(""); - LOG.info("/==========================================="); - LOG.info("| GCloud Emulator"); - - ContainerInfo containerInfo; - String id; - - try { - docker.inspectImage(DOCKER_IMAGE_NAME); - } catch (ImageNotFoundException e) { - // No such image so we must download it first. - LOG.info("| - Getting docker image \"{}\"", DOCKER_IMAGE_NAME); - docker.pull( - DOCKER_IMAGE_NAME, - message -> { - if (message.id() != null && message.progress() != null) { - LOG.info("| - Downloading > {} : {}", message.id(), message.progress()); - } - }); - } - - // No such container. Good, we create one! - LOG.info("| - Creating new container"); - - // Bind container ports to host ports - final Map<String, List<PortBinding>> portBindings = new HashMap<>(); - portBindings.put( - INTERNAL_PUBSUB_PORT, Collections.singletonList(PortBinding.randomPort("0.0.0.0"))); - - final HostConfig hostConfig = HostConfig.builder().portBindings(portBindings).build(); - - // Create new container with exposed ports - final ContainerConfig containerConfig = - ContainerConfig.builder() - .hostConfig(hostConfig) - .exposedPorts(INTERNAL_PUBSUB_PORT) - .image(DOCKER_IMAGE_NAME) - .cmd( - "sh", - "-c", - "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub --host-port=0.0.0.0:" - + INTERNAL_PUBSUB_PORT) - .build(); - - LOG.debug("Launching container with configuration {}", containerConfig); - final ContainerCreation creation = - docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT); - id = creation.id(); - - containerInfo = docker.inspectContainer(id); - - if (!containerInfo.state().running()) { - LOG.warn("| - Starting it up ...."); - docker.startContainer(id); - Thread.sleep(1000); - } - - containerInfo = docker.inspectContainer(id); - - dockerIpAddress = "127.0.0.1"; - - Map<String, List<PortBinding>> ports = containerInfo.networkSettings().ports(); - - assertNotNull("Unable to retrieve the ports where to connect to the emulators", ports); - assertEquals("We expect 1 port to be mapped", 1, ports.size()); - - pubsubPort = getPort(ports, INTERNAL_PUBSUB_PORT, "PubSub"); - - LOG.info("| Waiting for the emulators to be running"); - - // PubSub exposes an "Ok" at the root url when running. - if (!waitForOkStatus("PubSub", pubsubPort)) { - // Oops, we did not get an "Ok" within 10 seconds - startHasFailedKillEverything(); - } - LOG.info("\\==========================================="); - LOG.info(""); - } - - private static void startHasFailedKillEverything() - throws DockerException, InterruptedException { - LOG.error("|"); - LOG.error("| ==================== "); - LOG.error("| YOUR TESTS WILL FAIL "); - LOG.error("| ==================== "); - LOG.error("|"); - - // Kill this container and wipe all connection information - dockerIpAddress = null; - pubsubPort = null; - terminateAndDiscardAnyExistingContainers(false); - } - - private static final long MAX_RETRY_TIMEOUT = 10000; // Milliseconds - - private static boolean waitForOkStatus(String label, String port) { - long start = System.currentTimeMillis(); - while (true) { - try { - URL url = new URL("http://" + dockerIpAddress + ":" + port + "/"); - HttpURLConnection con = (HttpURLConnection) url.openConnection(); - con.setRequestMethod("GET"); - con.setConnectTimeout(50); - con.setReadTimeout(50); - - BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); - String inputLine; - StringBuilder content = new StringBuilder(); - while ((inputLine = in.readLine()) != null) { - content.append(inputLine); - } - in.close(); - con.disconnect(); - if (content.toString().contains("Ok")) { - LOG.info("| - {} Emulator is running at {}:{}", label, dockerIpAddress, port); - return true; - } - } catch (IOException e) { - long now = System.currentTimeMillis(); - if (now - start > MAX_RETRY_TIMEOUT) { - LOG.error( - "| - PubSub Emulator at {}:{} FAILED to return an Ok status within {} ms ", - dockerIpAddress, - port, - MAX_RETRY_TIMEOUT); - return false; - } - try { - Thread.sleep(100); // Sleep a very short time - } catch (InterruptedException e1) { - // Ignore - } - } - } - } - - private static String getPort( - Map<String, List<PortBinding>> ports, String internalTCPPort, String label) { - List<PortBinding> portMappings = ports.get(internalTCPPort + "/tcp"); - if (portMappings == null || portMappings.isEmpty()) { - LOG.info("| {} Emulator --> NOTHING CONNECTED TO {}/tcp", label, internalTCPPort); - return null; - } - - return portMappings.get(0).hostPort(); - } - - private static void terminateAndDiscardAnyExistingContainers(boolean warnAboutExisting) - throws DockerException, InterruptedException { - ContainerInfo containerInfo; - try { - containerInfo = docker.inspectContainer(CONTAINER_NAME_JUNIT); - // Already have this container running. - - assertNotNull( - "We should either get a containerInfo or we get an exception", containerInfo); - - LOG.info(""); - LOG.info("/==========================================="); - if (warnAboutExisting) { - LOG.warn("| >>> FOUND OLD EMULATOR INSTANCE RUNNING <<< "); - LOG.warn("| Destroying that one to keep tests running smoothly."); - } - LOG.info("| Cleanup of GCloud Emulator. Log output of container: "); - - if (LOG.isInfoEnabled()) { - try (LogStream stream = - docker.logs( - containerInfo.id(), - DockerClient.LogsParam.stdout(), - DockerClient.LogsParam.stderr())) { - LOG.info("| > {}", stream.readFully()); - } - } - - // We REQUIRE 100% accurate side effect free unit tests - // So we completely discard this one. - - String id = containerInfo.id(); - // Kill container - if (containerInfo.state().running()) { - docker.killContainer(id); - LOG.info("| - Killed"); - } - - // Remove container - docker.removeContainer(id); - - LOG.info("| - Removed"); - LOG.info("\\==========================================="); - LOG.info(""); - - } catch (ContainerNotFoundException cnfe) { - // No such container. Good ! - } - } - - public static void terminateDocker() throws DockerException, InterruptedException { - terminateAndDiscardAnyExistingContainers(false); - - // Close the docker client - docker.close(); - } - - // ==================================================================================== - -} diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java index 92eb6df21d5..bb780aa81a8 100644 --- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java +++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java @@ -22,36 +22,33 @@ import org.apache.flink.util.TestLogger; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.TransportChannelProvider; -import com.spotify.docker.client.exceptions.DockerException; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.utility.DockerImageName; -import java.io.Serializable; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress; -import static org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudEmulatorManager.getDockerPubSubPort; /** * The base class from which unit tests should inherit if they need to use the Google cloud * emulators. */ -public class GCloudUnitTestBase extends TestLogger implements Serializable { - @BeforeClass - public static void launchGCloudEmulator() throws Exception { - // Separated out into separate class so the entire test class to be serializable - GCloudEmulatorManager.launchDocker(); - } +public class GCloudUnitTestBase extends TestLogger { + + @ClassRule + public static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = + new PubSubEmulatorContainer( + DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:379.0.0")); @AfterClass - public static void terminateGCloudEmulator() throws DockerException, InterruptedException { + public static void terminateGCloudEmulator() throws InterruptedException { channel.shutdownNow(); channel.awaitTermination(1, TimeUnit.MINUTES); channel = null; - GCloudEmulatorManager.terminateDocker(); } // ==================================================================================== @@ -71,7 +68,7 @@ public class GCloudUnitTestBase extends TestLogger implements Serializable { } public static String getPubSubHostPort() { - return getDockerIpAddress() + ":" + getDockerPubSubPort(); + return PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint(); } @AfterClass