lvyanquan commented on code in PR #4294: URL: https://github.com/apache/flink-cdc/pull/4294#discussion_r2916597799
########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java: ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests.utils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.TestLogger; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import com.github.dockerjava.api.model.Volume; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.Container.ExecResult; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.FrameConsumerResultCallback; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.ToStringConsumer; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Test environment running pipeline job on Flink containers. */ +@Testcontainers +public abstract class PipelineTestEnvironment extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineTestEnvironment.class); + + protected Integer parallelism = getParallelism(); + + private int getParallelism() { + try { + return Integer.parseInt(System.getProperty("specifiedParallelism")); + } catch (NumberFormatException ex) { + LOG.warn( + "Unable to parse specified parallelism configuration ({} provided). Use 4 by default.", + System.getProperty("specifiedParallelism")); + return 4; + } + } + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + protected static final Duration EVENT_WAITING_TIMEOUT = Duration.ofMinutes(3); + protected static final Duration STARTUP_WAITING_TIMEOUT = Duration.ofMinutes(5); + + public static final Network NETWORK = Network.newNetwork(); + + @Container + protected static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer(MySqlVersion.V8_0) + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + // ------------------------------------------------------------------------------------------ + // Flink Variables + // ------------------------------------------------------------------------------------------ + protected static final int JOB_MANAGER_REST_PORT = 8081; + protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; + protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; + protected static final List<String> EXTERNAL_PROPS = + Arrays.asList( + String.format("jobmanager.rpc.address: %s", INTER_CONTAINER_JM_ALIAS), + "jobmanager.bind-host: 0.0.0.0", + "taskmanager.bind-host: 0.0.0.0", + "rest.bind-address: 0.0.0.0", + "rest.address: 0.0.0.0", + "jobmanager.memory.process.size: 1GB", + "query.server.port: 6125", + "blob.server.port: 6124", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "execution.checkpointing.interval: 300", + "state.backend.type: hashmap", + "env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-op ens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/jdk.internal.loader=ALL-UNNAMED --add-opens=java.base/java.security=ALL-UNNAMED --add-exports=java.base/sun.net.www=ALL-UNNAMED -Doracle.jdbc.timezoneAsRegion=false", + "execution.checkpointing.savepoint-dir: file:///opt/flink", + "restart-strategy.type: off", + "pekko.ask.timeout: 60s", + // Set off-heap memory explicitly to avoid "java.lang.OutOfMemoryError: Direct + // buffer memory" error. + "taskmanager.memory.task.off-heap.size: 128mb", + // Fix `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error + // has occurred` error. + "taskmanager.memory.jvm-metaspace.size: 512mb"); + public static final String FLINK_PROPERTIES = String.join("\n", EXTERNAL_PROPS); + + @Nullable protected RestClusterClient<StandaloneClusterId> restClusterClient; + + protected GenericContainer<?> jobManager; + protected GenericContainer<?> taskManager; + protected Volume sharedVolume = new Volume("/tmp/shared"); + + protected ToStringConsumer jobManagerConsumer; + + protected ToStringConsumer taskManagerConsumer; + + protected String flinkVersion = getFlinkVersion(); + + public static String getFlinkVersion() { + return "2.2.0"; + } + + protected List<String> copyJarToFlinkLib() { + return Collections.emptyList(); + } + + @BeforeEach + public void before() throws Exception { + LOG.info("Starting containers..."); + jobManagerConsumer = new ToStringConsumer(); + jobManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) + .withLogConsumer(jobManagerConsumer); + + List<String> jarToCopy = copyJarToFlinkLib(); + if (!jarToCopy.isEmpty()) { + for (String jar : jarToCopy) { + jobManager.withCopyFileToContainer( + MountableFile.forHostPath(TestUtils.getResource(jar)), "/opt/flink/lib/"); + } + } + + Startables.deepStart(Stream.of(jobManager)).join(); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); + LOG.info("JobManager is started."); + + taskManagerConsumer = new ToStringConsumer(); + taskManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withVolumesFrom(jobManager, BindMode.READ_WRITE) + .withLogConsumer(taskManagerConsumer); + Startables.deepStart(Stream.of(taskManager)).join(); + runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString()); + LOG.info("TaskManager is started."); + + TarballFetcher.fetchLatest(jobManager); + LOG.info("CDC executables deployed."); + } + + @AfterEach + public void after() { + if (restClusterClient != null) { + restClusterClient.close(); + } + if (jobManager != null) { + jobManager.stop(); + } + if (taskManager != null) { + taskManager.stop(); + } + } + + /** + * Submits a YAML job to the running cluster with latest CDC version, without from previous + * savepoint states. + */ + public JobID submitPipelineJob(String pipelineJob, Path... jars) throws Exception { + return submitPipelineJob( + TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, jars); + } + + /** + * Submits a YAML job to the running cluster with specific CDC version, without from previous + * savepoint states. + */ + public JobID submitPipelineJob( + TarballFetcher.CdcVersion version, String pipelineJob, Path... jars) throws Exception { + return submitPipelineJob(version, pipelineJob, null, false, jars); + } + + /** Submits a YAML job to the running cluster with latest CDC version. */ + public JobID submitPipelineJob( + String pipelineJob, + @Nullable String savepointPath, + boolean allowNonRestoredState, + Path... jars) + throws Exception { + return submitPipelineJob( + TarballFetcher.CdcVersion.SNAPSHOT, + pipelineJob, + savepointPath, + allowNonRestoredState, + jars); + } + + public JobID submitPipelineJob( + TarballFetcher.CdcVersion version, + String pipelineJob, + @Nullable String savepointPath, + boolean allowNonRestoredState, + Path... jars) + throws Exception { + + // Prepare external JAR dependencies + List<Path> paths = new ArrayList<>(Arrays.asList(jars)); + List<String> containerPaths = new ArrayList<>(); + paths.add(TestUtils.getResource("mysql-driver.jar")); + + for (Path jar : paths) { + String containerPath = version.workDir() + "/lib/" + jar.getFileName(); + jobManager.copyFileToContainer(MountableFile.forHostPath(jar), containerPath); + containerPaths.add(containerPath); + } + + containerPaths.add(version.workDir() + "/lib/values-cdc-pipeline-connector.jar"); + + StringBuilder sb = new StringBuilder(); + for (String containerPath : containerPaths) { + sb.append(" --jar ").append(containerPath); + } + + jobManager.copyFileToContainer( + Transferable.of(pipelineJob), version.workDir() + "/conf/pipeline.yaml"); + + String commands = + version.workDir() + + "/bin/flink-cdc.sh " + + version.workDir() + + "/conf/pipeline.yaml --flink-home /opt/flink" + + sb; + + if (savepointPath != null) { + commands += " --from-savepoint " + savepointPath; + if (allowNonRestoredState) { + commands += " --allow-nonRestored-state"; + } + } + LOG.info("Execute command: {}", commands); + ExecResult execResult = executeAndCheck(jobManager, commands); + return Arrays.stream(execResult.getStdout().split("\n")) + .filter(line -> line.startsWith("Job ID: ")) + .findFirst() + .map(line -> line.split(": ")[1]) + .map(JobID::fromHexString) + .orElse(null); Review Comment: This is almost the same content with https://github.com/apache/flink-cdc/blob/91ae6776e97c7ae0f1d0f5a26813e9fdb46651e8/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java#L79, will make modifications together in the subsequent PR to keep the two files close to consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
