hailin0 commented on code in PR #2675:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2675#discussion_r966635958


##########
seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java:
##########
@@ -17,128 +17,49 @@
 
 package org.apache.seatunnel.e2e.flink.sql;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import static org.apache.seatunnel.e2e.ContainerUtil.copyConfigFileToContainer;
+
+import org.apache.seatunnel.e2e.flink.AbstractFlinkContainer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
 
 /**
  * This class is the base class of FlinkEnvironment test.
  * The before method will create a Flink cluster, and after method will close 
the Flink cluster.
- * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to 
submit a seatunnel config and run a seatunnel job.
+ * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob(String)} to 
submit a seatunnel config and run a seatunnel job.
  */
-public abstract class FlinkContainer {
+public abstract class FlinkContainer extends AbstractFlinkContainer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkContainer.class);
 
     private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
-    protected static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> jobManager;
-    protected GenericContainer<?> taskManager;
-    private static final Path PROJECT_ROOT_PATH = 
Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh";
-    private static final String SEATUNNEL_FLINK_SQL_JAR = 
"seatunnel-core-flink-sql.jar";
-    private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, 
"bin", SEATUNNEL_FLINK_BIN).toString();
-    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, 
"lib", SEATUNNEL_FLINK_SQL_JAR).toString();
-
-    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
-    private static final String FLINK_PROPERTIES = String.join(
-        "\n",
-        Arrays.asList(
-            "jobmanager.rpc.address: jobmanager",
-            "taskmanager.numberOfTaskSlots: 10",
-            "parallelism.default: 4",
-            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
 
-    @BeforeEach
-    public void before() {
-        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
-            .withCommand("jobmanager")
-            .withNetwork(NETWORK)
-            .withNetworkAliases("jobmanager")
-            .withExposedPorts()
-            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
+    private static final String START_SHELL_NAME = "start-seatunnel-sql.sh";
 
-        taskManager =
-            new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                .withCommand("taskmanager")
-                .withNetwork(NETWORK)
-                .withNetworkAliases("taskmanager")
-                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                .dependsOn(jobManager)
-                .withLogConsumer(new Slf4jLogConsumer(LOG));
+    private static final String START_MODULE_NAME = "seatunnel-core-flink-sql";
 
-        Startables.deepStart(Stream.of(jobManager)).join();
-        Startables.deepStart(Stream.of(taskManager)).join();
-        copySeaTunnelFlinkFile();
-        LOG.info("Flink containers are started.");
-    }
-
-    @AfterEach
-    public void close() {
-        if (taskManager != null) {
-            taskManager.stop();
-        }
-        if (jobManager != null) {
-            jobManager.stop();
-        }
-    }
+    private static final String CONNECTORS_ROOT_PATH = 
"seatunnel-connectors/seatunnel-connectors-flink-sql";
 
-    public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
-        throws IOException, InterruptedException, URISyntaxException {
-        final String confPath = 
Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString();
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", 
confFile).toString();
-        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), 
targetConfInContainer);
+    private static final String CONNECTOR_TYPE = "seatunnel-sql";
 
-        // Running IT use cases under Windows requires replacing \ with /
-        String conf = targetConfInContainer.replaceAll("\\\\", "/");
-        final List<String> command = new ArrayList<>();
-        command.add(Paths.get(SEATUNNEL_HOME, 
"bin/start-seatunnel-sql.sh").toString());
-        command.add("--config " + conf);
+    private static final String CONNECTOR_PREFIX = "flink-sql-connector-";
 
-        Container.ExecResult execResult = jobManager.execInContainer("bash", 
"-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
-        return execResult;
+    public FlinkContainer() {
+        super(FLINK_DOCKER_IMAGE,
+            START_SHELL_NAME,
+            START_MODULE_NAME,
+            CONNECTORS_ROOT_PATH,
+            CONNECTOR_TYPE,
+            CONNECTOR_PREFIX);
     }
 
-    protected void copySeaTunnelFlinkFile() {
-        // copy lib
-        String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + 
"/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
-            FLINK_JAR_PATH);
-
-        // copy bin
-        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + 
"/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh";
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelFlinkBinPath),
-            Paths.get(SEATUNNEL_BIN).toString());
+    @Override
+    public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) 
throws IOException, InterruptedException {

Review Comment:
   👌



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to