Copilot commented on code in PR #4294:
URL: https://github.com/apache/flink-cdc/pull/4294#discussion_r2916165341


##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/ValuesE2eITCase.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/** End-to-end tests for values cdc pipeline job. */
+class ValuesE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValuesE2eITCase.class);
+
+    @Test
+    void testValuesSingleSplitSingleTable() throws Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: values\n"
+                                + "  event-set.id: SINGLE_SPLIT_SINGLE_TABLE\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "  print.enabled: true\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d",
+                        parallelism);
+
+        submitPipelineJob(pipelineJob);
+        waitUntilJobFinished(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");

Review Comment:
   This log message says the job is running, but the test just waited for the 
job to finish. Updating the message will make test logs less confusing when 
diagnosing failures.
   ```suggestion
           LOG.info("Pipeline job has finished");
   ```



##########
flink-cdc-flink2-compat/src/main/java/org/apache/flink/table/api/ValidationException.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+/**
+ * Compatibility adapter for Flink 2.2. This class is part of the 
multi-version compatibility layer
+ * that allows Flink CDC to work across different Flink versions.
+ */
+@Internal
+public class ValidationException extends RuntimeException {

Review Comment:
   This compatibility class defines 
org.apache.flink.table.api.ValidationException, but the parent POM already 
brings in Flink table dependencies (e.g., flink-table-api-java-bridge). 
Defining the same FQCN in flink-cdc-flink2-compat can shadow Flink’s real 
ValidationException at runtime and may break code that relies on 
constructors/behavior from the Flink-provided class. Prefer using the 
Flink-provided exception (and adjusting dependencies if needed) or relocate 
this shim into a non-Flink package to avoid classpath conflicts.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java:
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.TestLogger;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import com.github.dockerjava.api.model.Volume;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.output.ToStringConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Test environment running pipeline job on Flink containers. */
+@Testcontainers
+public abstract class PipelineTestEnvironment extends TestLogger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestEnvironment.class);
+
+    protected Integer parallelism = getParallelism();
+
+    private int getParallelism() {
+        try {
+            return 
Integer.parseInt(System.getProperty("specifiedParallelism"));
+        } catch (NumberFormatException ex) {
+            LOG.warn(
+                    "Unable to parse specified parallelism configuration ({} 
provided). Use 4 by default.",
+                    System.getProperty("specifiedParallelism"));
+            return 4;
+        }
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+    // 
------------------------------------------------------------------------------------------
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+    protected static final Duration EVENT_WAITING_TIMEOUT = 
Duration.ofMinutes(3);
+    protected static final Duration STARTUP_WAITING_TIMEOUT = 
Duration.ofMinutes(5);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    protected static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    // 
------------------------------------------------------------------------------------------
+    // Flink Variables
+    // 
------------------------------------------------------------------------------------------
+    protected static final int JOB_MANAGER_REST_PORT = 8081;
+    protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
+    protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+    protected static final List<String> EXTERNAL_PROPS =
+            Arrays.asList(
+                    String.format("jobmanager.rpc.address: %s", 
INTER_CONTAINER_JM_ALIAS),
+                    "jobmanager.bind-host: 0.0.0.0",
+                    "taskmanager.bind-host: 0.0.0.0",
+                    "rest.bind-address: 0.0.0.0",
+                    "rest.address: 0.0.0.0",
+                    "jobmanager.memory.process.size: 1GB",
+                    "query.server.port: 6125",
+                    "blob.server.port: 6124",
+                    "taskmanager.numberOfTaskSlots: 10",
+                    "parallelism.default: 4",
+                    "execution.checkpointing.interval: 300",
+                    "state.backend.type: hashmap",
+                    "env.java.default-opts.all: 
--add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.text=ALL-UNNAMED 
--add-opens=java.base/java.time=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-op
 ens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED 
--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED 
--add-opens=java.base/java.security=ALL-UNNAMED 
--add-exports=java.base/sun.net.www=ALL-UNNAMED 
-Doracle.jdbc.timezoneAsRegion=false",
+                    "execution.checkpointing.savepoint-dir: file:///opt/flink",
+                    "restart-strategy.type: off",
+                    "pekko.ask.timeout: 60s",
+                    // Set off-heap memory explicitly to avoid 
"java.lang.OutOfMemoryError: Direct
+                    // buffer memory" error.
+                    "taskmanager.memory.task.off-heap.size: 128mb",
+                    // Fix `java.lang.OutOfMemoryError: Metaspace. The 
metaspace out-of-memory error
+                    // has occurred` error.
+                    "taskmanager.memory.jvm-metaspace.size: 512mb");
+    public static final String FLINK_PROPERTIES = String.join("\n", 
EXTERNAL_PROPS);
+
+    @Nullable protected RestClusterClient<StandaloneClusterId> 
restClusterClient;
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+    protected Volume sharedVolume = new Volume("/tmp/shared");
+
+    protected ToStringConsumer jobManagerConsumer;
+
+    protected ToStringConsumer taskManagerConsumer;
+
+    protected String flinkVersion = getFlinkVersion();
+
+    public static String getFlinkVersion() {
+        return "2.2.0";
+    }
+
+    protected List<String> copyJarToFlinkLib() {
+        return Collections.emptyList();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        LOG.info("Starting containers...");
+        jobManagerConsumer = new ToStringConsumer();
+        jobManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("jobmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
+                        .withExposedPorts(JOB_MANAGER_REST_PORT)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withVolumes(sharedVolume))
+                        .withLogConsumer(jobManagerConsumer);
+
+        List<String> jarToCopy = copyJarToFlinkLib();
+        if (!jarToCopy.isEmpty()) {
+            for (String jar : jarToCopy) {
+                jobManager.withCopyFileToContainer(
+                        MountableFile.forHostPath(TestUtils.getResource(jar)), 
"/opt/flink/lib/");
+            }
+        }
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("JobManager is started.");
+
+        taskManagerConsumer = new ToStringConsumer();
+        taskManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("taskmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .dependsOn(jobManager)
+                        .withVolumesFrom(jobManager, BindMode.READ_WRITE)
+                        .withLogConsumer(taskManagerConsumer);
+        Startables.deepStart(Stream.of(taskManager)).join();
+        runInContainerAsRoot(taskManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("TaskManager is started.");
+
+        TarballFetcher.fetchLatest(jobManager);
+        LOG.info("CDC executables deployed.");
+    }
+
+    @AfterEach
+    public void after() {
+        if (restClusterClient != null) {
+            restClusterClient.close();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with latest CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(String pipelineJob, Path... jars) throws 
Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, 
jars);
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with specific CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version, String pipelineJob, Path... 
jars) throws Exception {
+        return submitPipelineJob(version, pipelineJob, null, false, jars);
+    }
+
+    /** Submits a YAML job to the running cluster with latest CDC version. */
+    public JobID submitPipelineJob(
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT,
+                pipelineJob,
+                savepointPath,
+                allowNonRestoredState,
+                jars);
+    }
+
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version,
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+
+        // Prepare external JAR dependencies
+        List<Path> paths = new ArrayList<>(Arrays.asList(jars));
+        List<String> containerPaths = new ArrayList<>();
+        paths.add(TestUtils.getResource("mysql-driver.jar"));
+
+        for (Path jar : paths) {
+            String containerPath = version.workDir() + "/lib/" + 
jar.getFileName();
+            jobManager.copyFileToContainer(MountableFile.forHostPath(jar), 
containerPath);
+            containerPaths.add(containerPath);
+        }
+
+        containerPaths.add(version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+
+        StringBuilder sb = new StringBuilder();
+        for (String containerPath : containerPaths) {
+            sb.append(" --jar ").append(containerPath);
+        }
+
+        jobManager.copyFileToContainer(
+                Transferable.of(pipelineJob), version.workDir() + 
"/conf/pipeline.yaml");
+
+        String commands =
+                version.workDir()
+                        + "/bin/flink-cdc.sh "
+                        + version.workDir()
+                        + "/conf/pipeline.yaml --flink-home /opt/flink"
+                        + sb;
+
+        if (savepointPath != null) {
+            commands += " --from-savepoint " + savepointPath;
+            if (allowNonRestoredState) {
+                commands += " --allow-nonRestored-state";
+            }
+        }
+        LOG.info("Execute command: {}", commands);
+        ExecResult execResult = executeAndCheck(jobManager, commands);
+        return Arrays.stream(execResult.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Job ID: "))
+                .findFirst()
+                .map(line -> line.split(": ")[1])
+                .map(JobID::fromHexString)
+                .orElse(null);

Review Comment:
   This method returns null when it fails to parse the JobID from stdout. 
Returning null makes later failures harder to diagnose (NPEs) and can hide 
submission issues; please throw an exception with stdout/stderr context when no 
Job ID line is found.
   ```suggestion
                   .orElseThrow(
                           () ->
                                   new RuntimeException(
                                           "Failed to parse JobID from 
stdout.\n"
                                                   + "Stdout:\n"
                                                   + execResult.getStdout()
                                                   + "\nStderr:\n"
                                                   + execResult.getStderr()));
   ```



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java:
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.TestLogger;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import com.github.dockerjava.api.model.Volume;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.output.ToStringConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Test environment running pipeline job on Flink containers. */
+@Testcontainers
+public abstract class PipelineTestEnvironment extends TestLogger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestEnvironment.class);
+
+    protected Integer parallelism = getParallelism();
+
+    private int getParallelism() {
+        try {
+            return 
Integer.parseInt(System.getProperty("specifiedParallelism"));
+        } catch (NumberFormatException ex) {
+            LOG.warn(
+                    "Unable to parse specified parallelism configuration ({} 
provided). Use 4 by default.",
+                    System.getProperty("specifiedParallelism"));
+            return 4;
+        }
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+    // 
------------------------------------------------------------------------------------------
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+    protected static final Duration EVENT_WAITING_TIMEOUT = 
Duration.ofMinutes(3);
+    protected static final Duration STARTUP_WAITING_TIMEOUT = 
Duration.ofMinutes(5);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    protected static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    // 
------------------------------------------------------------------------------------------
+    // Flink Variables
+    // 
------------------------------------------------------------------------------------------
+    protected static final int JOB_MANAGER_REST_PORT = 8081;
+    protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
+    protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+    protected static final List<String> EXTERNAL_PROPS =
+            Arrays.asList(
+                    String.format("jobmanager.rpc.address: %s", 
INTER_CONTAINER_JM_ALIAS),
+                    "jobmanager.bind-host: 0.0.0.0",
+                    "taskmanager.bind-host: 0.0.0.0",
+                    "rest.bind-address: 0.0.0.0",
+                    "rest.address: 0.0.0.0",
+                    "jobmanager.memory.process.size: 1GB",
+                    "query.server.port: 6125",
+                    "blob.server.port: 6124",
+                    "taskmanager.numberOfTaskSlots: 10",
+                    "parallelism.default: 4",
+                    "execution.checkpointing.interval: 300",
+                    "state.backend.type: hashmap",
+                    "env.java.default-opts.all: 
--add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.text=ALL-UNNAMED 
--add-opens=java.base/java.time=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-op
 ens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED 
--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED 
--add-opens=java.base/java.security=ALL-UNNAMED 
--add-exports=java.base/sun.net.www=ALL-UNNAMED 
-Doracle.jdbc.timezoneAsRegion=false",
+                    "execution.checkpointing.savepoint-dir: file:///opt/flink",
+                    "restart-strategy.type: off",
+                    "pekko.ask.timeout: 60s",
+                    // Set off-heap memory explicitly to avoid 
"java.lang.OutOfMemoryError: Direct
+                    // buffer memory" error.
+                    "taskmanager.memory.task.off-heap.size: 128mb",
+                    // Fix `java.lang.OutOfMemoryError: Metaspace. The 
metaspace out-of-memory error
+                    // has occurred` error.
+                    "taskmanager.memory.jvm-metaspace.size: 512mb");
+    public static final String FLINK_PROPERTIES = String.join("\n", 
EXTERNAL_PROPS);
+
+    @Nullable protected RestClusterClient<StandaloneClusterId> 
restClusterClient;
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+    protected Volume sharedVolume = new Volume("/tmp/shared");
+
+    protected ToStringConsumer jobManagerConsumer;
+
+    protected ToStringConsumer taskManagerConsumer;
+
+    protected String flinkVersion = getFlinkVersion();
+
+    public static String getFlinkVersion() {
+        return "2.2.0";
+    }
+
+    protected List<String> copyJarToFlinkLib() {
+        return Collections.emptyList();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        LOG.info("Starting containers...");
+        jobManagerConsumer = new ToStringConsumer();
+        jobManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("jobmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
+                        .withExposedPorts(JOB_MANAGER_REST_PORT)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withVolumes(sharedVolume))
+                        .withLogConsumer(jobManagerConsumer);
+
+        List<String> jarToCopy = copyJarToFlinkLib();
+        if (!jarToCopy.isEmpty()) {
+            for (String jar : jarToCopy) {
+                jobManager.withCopyFileToContainer(
+                        MountableFile.forHostPath(TestUtils.getResource(jar)), 
"/opt/flink/lib/");
+            }
+        }
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("JobManager is started.");
+
+        taskManagerConsumer = new ToStringConsumer();
+        taskManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("taskmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .dependsOn(jobManager)
+                        .withVolumesFrom(jobManager, BindMode.READ_WRITE)
+                        .withLogConsumer(taskManagerConsumer);
+        Startables.deepStart(Stream.of(taskManager)).join();
+        runInContainerAsRoot(taskManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("TaskManager is started.");
+
+        TarballFetcher.fetchLatest(jobManager);
+        LOG.info("CDC executables deployed.");
+    }
+
+    @AfterEach
+    public void after() {
+        if (restClusterClient != null) {
+            restClusterClient.close();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with latest CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(String pipelineJob, Path... jars) throws 
Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, 
jars);
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with specific CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version, String pipelineJob, Path... 
jars) throws Exception {
+        return submitPipelineJob(version, pipelineJob, null, false, jars);
+    }
+
+    /** Submits a YAML job to the running cluster with latest CDC version. */
+    public JobID submitPipelineJob(
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT,
+                pipelineJob,
+                savepointPath,
+                allowNonRestoredState,
+                jars);
+    }
+
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version,
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+
+        // Prepare external JAR dependencies
+        List<Path> paths = new ArrayList<>(Arrays.asList(jars));
+        List<String> containerPaths = new ArrayList<>();
+        paths.add(TestUtils.getResource("mysql-driver.jar"));
+
+        for (Path jar : paths) {
+            String containerPath = version.workDir() + "/lib/" + 
jar.getFileName();
+            jobManager.copyFileToContainer(MountableFile.forHostPath(jar), 
containerPath);
+            containerPaths.add(containerPath);
+        }
+
+        containerPaths.add(version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+
+        StringBuilder sb = new StringBuilder();
+        for (String containerPath : containerPaths) {
+            sb.append(" --jar ").append(containerPath);
+        }
+
+        jobManager.copyFileToContainer(
+                Transferable.of(pipelineJob), version.workDir() + 
"/conf/pipeline.yaml");
+
+        String commands =
+                version.workDir()
+                        + "/bin/flink-cdc.sh "
+                        + version.workDir()
+                        + "/conf/pipeline.yaml --flink-home /opt/flink"
+                        + sb;
+
+        if (savepointPath != null) {
+            commands += " --from-savepoint " + savepointPath;
+            if (allowNonRestoredState) {
+                commands += " --allow-nonRestored-state";
+            }
+        }
+        LOG.info("Execute command: {}", commands);
+        ExecResult execResult = executeAndCheck(jobManager, commands);
+        return Arrays.stream(execResult.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Job ID: "))
+                .findFirst()
+                .map(line -> line.split(": ")[1])
+                .map(JobID::fromHexString)
+                .orElse(null);
+    }
+
+    public String stopJobWithSavepoint(JobID jobID) {
+        String savepointPath = "/opt/flink/";
+        ExecResult result =
+                executeAndCheck(
+                        jobManager,
+                        "flink",
+                        "stop",
+                        jobID.toHexString(),
+                        "--savepointPath",
+                        savepointPath);
+
+        return Arrays.stream(result.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Savepoint completed."))
+                .findFirst()
+                .map(line -> line.split("Path: file:")[1])
+                .orElseThrow(
+                        () -> new RuntimeException("Failed to parse savepoint 
path from stdout."));
+    }
+
+    public void cancelJob(JobID jobID) {
+        executeAndCheck(jobManager, "flink", "cancel", jobID.toHexString());
+    }
+
+    /**
+     * Get {@link RestClusterClient} connected to this FlinkContainer.
+     *
+     * <p>This method lazily initializes the REST client on-demand.
+     */
+    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
+        if (restClusterClient != null) {
+            return restClusterClient;
+        }
+        checkState(
+                jobManager.isRunning(),
+                "Cluster client should only be retrieved for a running 
cluster");
+        try {
+            final Configuration clientConfiguration = new Configuration();
+            clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
+            clientConfiguration.set(
+                    RestOptions.PORT, 
jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
+            this.restClusterClient =
+                    new RestClusterClient<>(clientConfiguration, 
StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Failed to create client for Flink container cluster", e);
+        }
+        return restClusterClient;
+    }
+
+    public void waitUntilJobRunning(Duration timeout) {
+        waitUntilJobState(timeout, JobStatus.RUNNING);
+    }
+
+    public void waitUntilJobFinished(Duration timeout) {
+        waitUntilJobState(timeout, JobStatus.FINISHED);
+    }
+
+    public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
+        RestClusterClient<?> clusterClient = getRestClusterClient();
+        Deadline deadline = Deadline.fromNow(timeout);
+        while (deadline.hasTimeLeft()) {
+            Collection<JobStatusMessage> jobStatusMessages;
+            try {
+                jobStatusMessages = clusterClient.listJobs().get(10, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.warn("Error when fetching job status.", e);
+                continue;
+            }
+            if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
+                JobStatusMessage message = jobStatusMessages.iterator().next();
+                JobStatus jobStatus = message.getJobState();
+                if (!expectedStatus.isTerminalState() && 
jobStatus.isTerminalState()) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Job has been terminated! JobName: %s, 
JobID: %s, Status: %s",
+                                    message.getJobName(),
+                                    message.getJobId(),
+                                    message.getJobState()));
+                } else if (jobStatus == expectedStatus) {
+                    return;
+                }
+            }
+        }
+    }
+
+    protected String getFlinkDockerImageTag() {
+        if (System.getProperty("java.specification.version").equals("17")) {
+            return String.format("flink:%s-scala_2.12-java17", flinkVersion);
+        }
+        return String.format("flink:%s-scala_2.12-java11", flinkVersion);
+    }
+
+    private ExecResult executeAndCheck(GenericContainer<?> container, 
String... command) {
+        String joinedCommand = String.join(" ", command);
+        try {
+            LOG.info("Executing command {}", joinedCommand);
+            ExecResult execResult =
+                    container.execInContainer("bash", "-c", String.join(" ", 
command));
+            LOG.info(execResult.getStdout());
+            if (execResult.getExitCode() == 0) {
+                LOG.info("Command executed successfully.");
+                return execResult;
+            } else {
+                LOG.error(execResult.getStderr());
+                throw new AssertionError(
+                        "Failed when submitting the pipeline job.\n"
+                                + "Exit code: "
+                                + execResult.getExitCode()
+                                + "\n"
+                                + "StdOut: "
+                                + execResult.getStdout()
+                                + "\n"
+                                + "StdErr: "
+                                + execResult.getStderr());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to execute command " + joinedCommand + " in 
container " + container);

Review Comment:
   The caught exception is dropped here, which loses the root cause (and makes 
CI failures much harder to debug). Please include the original exception as the 
cause when rethrowing (and ideally include stderr/stdout when available).
   ```suggestion
                       "Failed to execute command " + joinedCommand + " in 
container " + container,
                       e);
   ```



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java:
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.TestLogger;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import com.github.dockerjava.api.model.Volume;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.output.ToStringConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Test environment running pipeline job on Flink containers. */
+@Testcontainers
+public abstract class PipelineTestEnvironment extends TestLogger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestEnvironment.class);
+
+    protected Integer parallelism = getParallelism();
+
+    private int getParallelism() {
+        try {
+            return 
Integer.parseInt(System.getProperty("specifiedParallelism"));
+        } catch (NumberFormatException ex) {
+            LOG.warn(
+                    "Unable to parse specified parallelism configuration ({} 
provided). Use 4 by default.",
+                    System.getProperty("specifiedParallelism"));
+            return 4;
+        }
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+    // 
------------------------------------------------------------------------------------------
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+    protected static final Duration EVENT_WAITING_TIMEOUT = 
Duration.ofMinutes(3);
+    protected static final Duration STARTUP_WAITING_TIMEOUT = 
Duration.ofMinutes(5);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    protected static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    // 
------------------------------------------------------------------------------------------
+    // Flink Variables
+    // 
------------------------------------------------------------------------------------------
+    protected static final int JOB_MANAGER_REST_PORT = 8081;
+    protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
+    protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+    protected static final List<String> EXTERNAL_PROPS =
+            Arrays.asList(
+                    String.format("jobmanager.rpc.address: %s", 
INTER_CONTAINER_JM_ALIAS),
+                    "jobmanager.bind-host: 0.0.0.0",
+                    "taskmanager.bind-host: 0.0.0.0",
+                    "rest.bind-address: 0.0.0.0",
+                    "rest.address: 0.0.0.0",
+                    "jobmanager.memory.process.size: 1GB",
+                    "query.server.port: 6125",
+                    "blob.server.port: 6124",
+                    "taskmanager.numberOfTaskSlots: 10",
+                    "parallelism.default: 4",
+                    "execution.checkpointing.interval: 300",
+                    "state.backend.type: hashmap",
+                    "env.java.default-opts.all: 
--add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.text=ALL-UNNAMED 
--add-opens=java.base/java.time=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-op
 ens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED 
--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED 
--add-opens=java.base/java.security=ALL-UNNAMED 
--add-exports=java.base/sun.net.www=ALL-UNNAMED 
-Doracle.jdbc.timezoneAsRegion=false",
+                    "execution.checkpointing.savepoint-dir: file:///opt/flink",
+                    "restart-strategy.type: off",
+                    "pekko.ask.timeout: 60s",
+                    // Set off-heap memory explicitly to avoid 
"java.lang.OutOfMemoryError: Direct
+                    // buffer memory" error.
+                    "taskmanager.memory.task.off-heap.size: 128mb",
+                    // Fix `java.lang.OutOfMemoryError: Metaspace. The 
metaspace out-of-memory error
+                    // has occurred` error.
+                    "taskmanager.memory.jvm-metaspace.size: 512mb");
+    public static final String FLINK_PROPERTIES = String.join("\n", 
EXTERNAL_PROPS);
+
+    @Nullable protected RestClusterClient<StandaloneClusterId> 
restClusterClient;
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+    protected Volume sharedVolume = new Volume("/tmp/shared");
+
+    protected ToStringConsumer jobManagerConsumer;
+
+    protected ToStringConsumer taskManagerConsumer;
+
+    protected String flinkVersion = getFlinkVersion();
+
+    public static String getFlinkVersion() {
+        return "2.2.0";
+    }
+
+    protected List<String> copyJarToFlinkLib() {
+        return Collections.emptyList();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        LOG.info("Starting containers...");
+        jobManagerConsumer = new ToStringConsumer();
+        jobManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("jobmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
+                        .withExposedPorts(JOB_MANAGER_REST_PORT)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withVolumes(sharedVolume))
+                        .withLogConsumer(jobManagerConsumer);
+
+        List<String> jarToCopy = copyJarToFlinkLib();
+        if (!jarToCopy.isEmpty()) {
+            for (String jar : jarToCopy) {
+                jobManager.withCopyFileToContainer(
+                        MountableFile.forHostPath(TestUtils.getResource(jar)), 
"/opt/flink/lib/");
+            }
+        }
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("JobManager is started.");
+
+        taskManagerConsumer = new ToStringConsumer();
+        taskManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("taskmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .dependsOn(jobManager)
+                        .withVolumesFrom(jobManager, BindMode.READ_WRITE)
+                        .withLogConsumer(taskManagerConsumer);
+        Startables.deepStart(Stream.of(taskManager)).join();
+        runInContainerAsRoot(taskManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("TaskManager is started.");
+
+        TarballFetcher.fetchLatest(jobManager);
+        LOG.info("CDC executables deployed.");
+    }
+
+    @AfterEach
+    public void after() {
+        if (restClusterClient != null) {
+            restClusterClient.close();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with latest CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(String pipelineJob, Path... jars) throws 
Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, 
jars);
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with specific CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version, String pipelineJob, Path... 
jars) throws Exception {
+        return submitPipelineJob(version, pipelineJob, null, false, jars);
+    }
+
+    /** Submits a YAML job to the running cluster with latest CDC version. */
+    public JobID submitPipelineJob(
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT,
+                pipelineJob,
+                savepointPath,
+                allowNonRestoredState,
+                jars);
+    }
+
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version,
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+
+        // Prepare external JAR dependencies
+        List<Path> paths = new ArrayList<>(Arrays.asList(jars));
+        List<String> containerPaths = new ArrayList<>();
+        paths.add(TestUtils.getResource("mysql-driver.jar"));
+
+        for (Path jar : paths) {
+            String containerPath = version.workDir() + "/lib/" + 
jar.getFileName();
+            jobManager.copyFileToContainer(MountableFile.forHostPath(jar), 
containerPath);
+            containerPaths.add(containerPath);
+        }
+
+        containerPaths.add(version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+
+        StringBuilder sb = new StringBuilder();
+        for (String containerPath : containerPaths) {
+            sb.append(" --jar ").append(containerPath);
+        }
+
+        jobManager.copyFileToContainer(
+                Transferable.of(pipelineJob), version.workDir() + 
"/conf/pipeline.yaml");
+
+        String commands =
+                version.workDir()
+                        + "/bin/flink-cdc.sh "
+                        + version.workDir()
+                        + "/conf/pipeline.yaml --flink-home /opt/flink"
+                        + sb;
+
+        if (savepointPath != null) {
+            commands += " --from-savepoint " + savepointPath;
+            if (allowNonRestoredState) {
+                commands += " --allow-nonRestored-state";
+            }
+        }
+        LOG.info("Execute command: {}", commands);
+        ExecResult execResult = executeAndCheck(jobManager, commands);
+        return Arrays.stream(execResult.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Job ID: "))
+                .findFirst()
+                .map(line -> line.split(": ")[1])
+                .map(JobID::fromHexString)
+                .orElse(null);
+    }
+
+    public String stopJobWithSavepoint(JobID jobID) {
+        String savepointPath = "/opt/flink/";
+        ExecResult result =
+                executeAndCheck(
+                        jobManager,
+                        "flink",
+                        "stop",
+                        jobID.toHexString(),
+                        "--savepointPath",
+                        savepointPath);
+
+        return Arrays.stream(result.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Savepoint completed."))
+                .findFirst()
+                .map(line -> line.split("Path: file:")[1])
+                .orElseThrow(
+                        () -> new RuntimeException("Failed to parse savepoint 
path from stdout."));
+    }
+
+    public void cancelJob(JobID jobID) {
+        executeAndCheck(jobManager, "flink", "cancel", jobID.toHexString());
+    }
+
+    /**
+     * Get {@link RestClusterClient} connected to this FlinkContainer.
+     *
+     * <p>This method lazily initializes the REST client on-demand.
+     */
+    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
+        if (restClusterClient != null) {
+            return restClusterClient;
+        }
+        checkState(
+                jobManager.isRunning(),
+                "Cluster client should only be retrieved for a running 
cluster");
+        try {
+            final Configuration clientConfiguration = new Configuration();
+            clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
+            clientConfiguration.set(
+                    RestOptions.PORT, 
jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
+            this.restClusterClient =
+                    new RestClusterClient<>(clientConfiguration, 
StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Failed to create client for Flink container cluster", e);
+        }
+        return restClusterClient;
+    }
+
+    public void waitUntilJobRunning(Duration timeout) {
+        waitUntilJobState(timeout, JobStatus.RUNNING);
+    }
+
+    public void waitUntilJobFinished(Duration timeout) {
+        waitUntilJobState(timeout, JobStatus.FINISHED);
+    }
+
+    public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
+        RestClusterClient<?> clusterClient = getRestClusterClient();
+        Deadline deadline = Deadline.fromNow(timeout);
+        while (deadline.hasTimeLeft()) {
+            Collection<JobStatusMessage> jobStatusMessages;
+            try {
+                jobStatusMessages = clusterClient.listJobs().get(10, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.warn("Error when fetching job status.", e);
+                continue;
+            }
+            if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
+                JobStatusMessage message = jobStatusMessages.iterator().next();
+                JobStatus jobStatus = message.getJobState();
+                if (!expectedStatus.isTerminalState() && 
jobStatus.isTerminalState()) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Job has been terminated! JobName: %s, 
JobID: %s, Status: %s",
+                                    message.getJobName(),
+                                    message.getJobId(),
+                                    message.getJobState()));
+                } else if (jobStatus == expectedStatus) {
+                    return;
+                }
+            }
+        }
+    }

Review Comment:
   If the job never reaches the expected state (or reaches a different terminal 
state, e.g. FAILED when expecting FINISHED), this method falls out of the loop 
and returns without failing the test. Please throw a 
TimeoutException/AssertionError after the deadline, and consider failing fast 
when the job reaches any terminal state that doesn't match expectedStatus.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+/** Obtain and downloads corresponding Flink CDC tarball files. */
+public abstract class TarballFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TarballFetcher.class);
+
+    public static void fetchAll(GenericContainer<?> container) throws 
Exception {
+        fetch(container, CdcVersion.values());
+    }
+
+    public static void fetchLatest(GenericContainer<?> container) throws 
Exception {
+        fetch(container, CdcVersion.SNAPSHOT);
+    }
+
+    public static void fetch(GenericContainer<?> container, CdcVersion... 
versions)
+            throws Exception {
+        for (CdcVersion version : versions) {
+            TarballFetcher.fetchInternal(container, version);
+        }
+    }
+
+    private static void fetchInternal(GenericContainer<?> container, 
CdcVersion version)
+            throws Exception {
+        LOG.info("Trying to download CDC tarball @ {}...", version);
+        if (CdcVersion.SNAPSHOT.equals(version)) {
+            LOG.info("CDC {} is a snapshot version, we should fetch it 
locally...", version);
+
+            container.copyFileToContainer(
+                    MountableFile.forHostPath(
+                            TestUtils.getResource("flink-cdc.sh", 
"flink-cdc-dist", "src"), 0755),
+                    version.workDir() + "/bin/flink-cdc.sh");
+            container.copyFileToContainer(
+                    MountableFile.forHostPath(
+                            TestUtils.getResource("flink-cdc.yaml", 
"flink-cdc-dist", "src"), 0755),
+                    version.workDir() + "/conf/flink-cdc.yaml");
+            container.copyFileToContainer(
+                    
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
+                    version.workDir() + "/lib/flink-cdc-dist.jar");
+            container.copyFileToContainer(
+                    MountableFile.forHostPath(
+                            
TestUtils.getResource("values-cdc-pipeline-connector.jar")),
+                    version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+
+        } else {
+            LOG.info("CDC {} is a released version, download it from the 
Internet...", version);
+
+            String containerPath = "/tmp/tarball/" + version.getVersion() + 
".tar.gz";
+            downloadAndCopyToContainer(container, version.tarballUrl(), 
containerPath);
+            container.execInContainer("mkdir", "-p", version.workDir());
+            container.execInContainer(
+                    "tar", "-xzvf", containerPath, "-C", version.workDir(), 
"--strip-components=1");
+
+            downloadAndCopyToContainer(
+                    container,
+                    version.connectorJarUrl("values"),
+                    version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+        }
+    }
+
+    private static void downloadAndCopyToContainer(
+            GenericContainer<?> container, String url, String containerPath) 
throws Exception {
+        Path tempFile = Files.createTempFile("download-", ".tmp");
+        FileUtils.copyURLToFile(

Review Comment:
   Temporary file created for downloading artifacts is never deleted. This can 
leak files across repeated test runs/CI jobs; please delete the temp file in a 
finally block (or use try-with-resources style cleanup) after copying it into 
the container.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to