This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 196e7f722 Add test-container utils for integration test. (#3002)
196e7f722 is described below

commit 196e7f7222019f0e9fc0a783e8ba7794cdafcc74
Author: Yuepeng Pan <[email protected]>
AuthorDate: Wed Aug 30 21:59:09 2023 +0800

    Add test-container utils for integration test. (#3002)
---
 .github/workflows/maven.yml                        |   4 +-
 pom.xml                                            |   1 +
 .../streampark-console-service/pom.xml             |  10 +-
 .../src/main/assembly/assembly.xml                 |   1 +
 .../core/service/impl/ApplicationServiceImpl.java  |   2 +-
 .../core/service/impl/FlinkClusterServiceImpl.java |   5 +
 .../console/core/task/FlinkHttpWatcher.java        |  11 +-
 .../console/SpringIntegrationTestBase.java         | 123 ++++++++++++++
 ...SpringTestBase.java => SpringUnitTestBase.java} |  10 +-
 .../core/service/AccessTokenServiceTest.java       |   4 +-
 .../core/service/ApplicationServiceITest.java      | 153 +++++++++++++++++
 .../core/service/ApplicationServiceTest.java       |   6 +-
 .../core/service/FlinkClusterServiceTest.java      |   4 +-
 .../console/core/service/ResourceServiceTest.java  |   4 +-
 .../console/core/service/SavePointServiceTest.java |   4 +-
 .../console/core/service/UserServiceTest.java      |  19 ++-
 .../console/core/service/VariableServiceTest.java  |   4 +-
 .../console/core/service/YarnQueueServiceTest.java |   8 +-
 .../console/system/authentication/JWTTest.java     |   4 +-
 .../resources/application-integration-test.yml     |  41 +++++
 streampark-test-utils/pom.xml                      |  40 +++++
 .../streampark-testcontainer/pom.xml               | 105 ++++++++++++
 .../testcontainer/flink/FlinkComponent.java        |  36 ++++
 .../testcontainer/flink/FlinkContainer.java        |  64 +++++++
 .../flink/FlinkStandaloneSessionCluster.java       | 185 +++++++++++++++++++++
 .../testcontainer/hadoop/HadoopContainer.java      |  86 ++++++++++
 .../flink/FlinkStandaloneSessionClusterITest.java  |  56 +++++++
 .../testcontainer/hadoop/HadoopContainerTest.java  |  63 +++++++
 28 files changed, 1015 insertions(+), 38 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 23b6cfce5..6c7432de6 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -88,7 +88,7 @@ jobs:
           distribution: "adopt"
           cache: "maven"
       - name: Build with Maven
-        run: ./mvnw -B clean install -Pshaded -DskipTests
+        run: ./mvnw -B clean install -Pshaded,dist -DskipTests
       - name: Test with Maven
-        run: ./mvnw -B test
+        run:   wget -c 
https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz -P 
/tmp/ && tar -zxvf /tmp/flink-1.17.1-bin-scala_2.12.tgz && ./mvnw -B test
 
diff --git a/pom.xml b/pom.xml
index 624783ff5..fef4ec416 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@
         <module>streampark-common</module>
         <module>streampark-flink</module>
         <module>streampark-console</module>
+        <module>streampark-test-utils</module>
     </modules>
 
     <properties>
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index 4202d7c96..28d0c11ae 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -380,7 +380,6 @@
             <version>${project.version}</version>
         </dependency>
 
-
         <dependency>
             <groupId>com.fasterxml.jackson.module</groupId>
             
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
@@ -412,6 +411,13 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-testcontainer</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!--Test dependencies end.-->
 
         <!--log4j -->
@@ -447,7 +453,7 @@
             <groupId>dev.zio</groupId>
             <artifactId>zio-logging_${scala.binary.version}</artifactId>
         </dependency>
-      
+
         <dependency>
             <groupId>dev.zio</groupId>
             <artifactId>zio-streams_${scala.binary.version}</artifactId>
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml 
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
index 92c73dac4..aa4677c4a 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
@@ -17,6 +17,7 @@
 <assembly>
     <id>bin</id>
     <formats>
+        <format>dir</format>
         <format>tar.gz</format>
     </formats>
     <includeBaseDirectory>true</includeBaseDirectory>
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index a94879640..99a28a87c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1676,7 +1676,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   }
 
   private Map<String, Object> getProperties(Application application) {
-    Map<String, Object> properties = application.getOptionMap();
+    Map<String, Object> properties = new HashMap<>(application.getOptionMap());
     if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
       FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
       ApiAlertException.throwIfNull(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 3f6f3dd55..636caac80 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -139,6 +139,11 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   @Override
   public Boolean create(FlinkCluster flinkCluster) {
     flinkCluster.setUserId(commonService.getUserId());
+    return internalCreate(flinkCluster);
+  }
+
+  @VisibleForTesting
+  public boolean internalCreate(FlinkCluster flinkCluster) {
     boolean successful = validateQueueIfNeeded(flinkCluster);
     ApiAlertException.throwIfFalse(
         successful, String.format(ERROR_CLUSTER_QUEUE_HINT, 
flinkCluster.getYarnQueue()));
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 26302387a..ce271da59 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -38,6 +38,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.hc.client5.http.config.RequestConfig;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -49,6 +50,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
@@ -84,7 +87,7 @@ public class FlinkHttpWatcher {
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
 
   // track interval  every 5 seconds
-  private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+  public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
 
   // option interval within 10 seconds
   private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
@@ -197,6 +200,12 @@ public class FlinkHttpWatcher {
     }
   }
 
+  @VisibleForTesting
+  public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
+    Application app = WATCHING_APPS.get(appId);
+    return (app == null || app.getState() == null) ? null : 
FlinkAppState.of(app.getState());
+  }
+
   private void watch(Long id, Application application) {
     EXECUTOR.execute(
         () -> {
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
new file mode 100644
index 000000000..8780e221e
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console;
+
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.SystemPropertyUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import 
org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
+import 
org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Integration base tester. Note: The all children classes of the base must 
run after the
+ * project-level package phrase.
+ */
+@Slf4j
+@EnableScheduling
+@ActiveProfiles("integration-test")
+@AutoConfigureTestEntityManager
+@AutoConfigureWebTestClient(timeout = "60000")
+@TestPropertySource(locations = {"classpath:application-integration-test.yml"})
+@ExtendWith({MockitoExtension.class, SpringExtension.class})
+@SpringBootTest(
+    classes = StreamParkConsoleBootstrap.class,
+    webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
+public abstract class SpringIntegrationTestBase {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(SpringIntegrationTestBase.class);
+
+  protected static final String RUN_PKG_SCRIPT_HINT =
+      "Please run package script before running the test case.";
+
+  protected static final String DEFAULT_APP_HOME_DIR_NAME = 
"apache-streampark";
+  protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME = 
"localWorkspace";
+  protected static final String DEFAULT_FLINK_VERSION = "1.17.1";
+  protected static final FileFilter PKG_NAME_FILTER =
+      file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) && 
file.isDirectory();
+  protected static String defaultFlinkHome = "/tmp/flink-1.17.1";
+  protected static String appHome;
+
+  @BeforeAll
+  public static void init(@TempDir File tempPath) throws IOException {
+
+    LOG.info("Start prepare the real running env.");
+    String tempAbsPath = tempPath.getAbsolutePath();
+    LOG.info("Integration test base tmp dir: {}", tempAbsPath);
+
+    FileUtils.copyDirectory(
+        tryFindStreamParkPackagedDirFile(), new File(tempAbsPath, 
DEFAULT_APP_HOME_DIR_NAME));
+
+    Path localWorkspace =
+        Files.createDirectories(new File(tempAbsPath, 
DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());
+
+    appHome = new File(tempAbsPath, 
DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
+    System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
+    System.setProperty(
+        CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
+        localWorkspace.toAbsolutePath().toString());
+
+    LOG.info(
+        "Complete mock EnvInitializer init, app home: {}, {}: {}",
+        appHome,
+        CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
+        localWorkspace.toAbsolutePath());
+  }
+
+  private static File tryFindStreamParkPackagedDirFile() {
+    String userDir = 
Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
+    File pkgTargetDirFile = new File(userDir, "target");
+    Preconditions.checkState(
+        pkgTargetDirFile.exists(),
+        "The target directory of %s doesn't exist. %s",
+        userDir,
+        RUN_PKG_SCRIPT_HINT);
+    Optional<File> availablePkgParentFileOpt =
+        
Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
+    final File availablePkgParentFile =
+        availablePkgParentFileOpt.orElseThrow(() -> new 
RuntimeException(RUN_PKG_SCRIPT_HINT));
+    Optional<File> targetDirFile =
+        
Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER)))
+            .findFirst();
+    return targetDirFile.orElseThrow(() -> new 
RuntimeException(RUN_PKG_SCRIPT_HINT));
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
similarity index 95%
rename from 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
rename to 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
index 417a117ce..490df51c0 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
@@ -24,6 +24,7 @@ import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.YarnQueue;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
@@ -33,10 +34,10 @@ import org.slf4j.LoggerFactory;
 import 
org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
 import 
org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,7 +45,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 /** base tester. */
-@Transactional
+@Slf4j
+@EnableScheduling
 @ActiveProfiles("test")
 @AutoConfigureTestEntityManager
 @AutoConfigureWebTestClient(timeout = "60000")
@@ -53,9 +55,9 @@ import java.nio.file.Path;
 @SpringBootTest(
     classes = StreamParkConsoleBootstrap.class,
     webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
-public abstract class SpringTestBase {
+public abstract class SpringUnitTestBase {
 
-  protected static final Logger LOG = 
LoggerFactory.getLogger(SpringTestBase.class);
+  protected static final Logger LOG = 
LoggerFactory.getLogger(SpringUnitTestBase.class);
 
   @BeforeAll
   public static void init(@TempDir File tempPath) throws IOException {
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
index f97823ce5..f20d7fb80 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service;
 
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.util.WebUtils;
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-public class AccessTokenServiceTest extends SpringTestBase {
+public class AccessTokenServiceTest extends SpringUnitTestBase {
 
   @Autowired private AccessTokenService accessTokenService;
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
new file mode 100644
index 000000000..812f5da37
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
@@ -0,0 +1,153 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.SpringIntegrationTestBase;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.enums.FlinkAppState;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.Base64;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for {@link
+ * org.apache.streampark.console.core.service.impl.ApplicationServiceImpl}.
+ */
+class ApplicationServiceITest extends SpringIntegrationTestBase {
+
+  static FlinkStandaloneSessionCluster cluster =
+      
FlinkStandaloneSessionCluster.builder().slotsNumPerTm(4).slf4jLogConsumer(null).build();
+
+  @Autowired private ApplicationService appService;
+
+  @Autowired private FlinkClusterService clusterService;
+
+  @Autowired private FlinkEnvService envService;
+
+  @Autowired private AppBuildPipeService appBuildPipeService;
+
+  @Autowired private FlinkSqlService sqlService;
+
+  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+
+  @BeforeAll
+  static void setup() {
+    cluster.start();
+  }
+
+  @AfterAll
+  static void teardown() {
+    cluster.stop();
+  }
+
+  @AfterEach
+  void clear() {
+    appService.getBaseMapper().delete(new QueryWrapper<>());
+    clusterService.getBaseMapper().delete(new QueryWrapper<>());
+    envService.getBaseMapper().delete(new QueryWrapper<>());
+    appBuildPipeService.getBaseMapper().delete(new QueryWrapper<>());
+    sqlService.getBaseMapper().delete(new QueryWrapper<>());
+  }
+
+  @Test
+  @Timeout(value = 180)
+  void testStartAppOnRemoteSessionMode() throws Exception {
+    FlinkEnv flinkEnv = new FlinkEnv();
+    flinkEnv.setFlinkHome(defaultFlinkHome);
+    flinkEnv.setFlinkName(DEFAULT_FLINK_VERSION);
+    flinkEnv.setId(1L);
+    envService.create(flinkEnv);
+    FlinkCluster flinkCluster = new FlinkCluster();
+    flinkCluster.setId(1L);
+    flinkCluster.setAddress(cluster.getFlinkJobManagerUrl());
+    flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode());
+    flinkCluster.setClusterName("docker-Cluster-1.17.1");
+    flinkCluster.setVersionId(1L);
+    flinkCluster.setUserId(100000L);
+    ((FlinkClusterServiceImpl) clusterService).internalCreate(flinkCluster);
+    Application appParam = new Application();
+    appParam.setId(100000L);
+    appParam.setTeamId(100000L);
+    Application application = appService.getApp(appParam);
+    application.setFlinkClusterId(1L);
+    application.setSqlId(100000L);
+    application.setVersionId(1L);
+    application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+
+    // Avoid exceptional error.
+    application.setFlinkSql(
+        new 
String(Base64.getDecoder().decode(application.getFlinkSql().getBytes())));
+    FlinkSql flinkSql = sqlService.getEffective(application.getId(), false);
+    flinkSql.setSql(DeflaterUtils.zipString(flinkSql.getSql()));
+    sqlService.getBaseMapper().updateById(flinkSql);
+
+    // Continue operations link.
+    appService.update(application);
+    appBuildPipeService.buildApplication(100000L, false);
+
+    CompletableFuture<Boolean> buildCompletableFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              while (true) {
+                Application app = appService.getById(100000L);
+                if (app != null && app.getReleaseState() == ReleaseState.DONE) 
{
+                  break;
+                }
+              }
+              return true;
+            });
+    buildCompletableFuture.get();
+
+    appService.start(appService.getById(100000L), false);
+    CompletableFuture<Boolean> completableFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              while (true) {
+                if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId())
+                    == FlinkAppState.RUNNING) {
+                  break;
+                }
+              }
+              return true;
+            });
+
+    assertThat(completableFuture.get(WATCHING_INTERVAL.toMillis() * 4, 
TimeUnit.MILLISECONDS))
+        .isEqualTo(true);
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
index ea5c34560..f6ce39a54 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.YarnQueue;
 import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl;
@@ -34,8 +34,8 @@ import java.util.Date;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** org.apache.streampark.console.core.service.ApplicationServiceTest. */
-class ApplicationServiceTest extends SpringTestBase {
+/** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */
+class ApplicationServiceTest extends SpringUnitTestBase {
 
   @Autowired private ApplicationService applicationService;
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
index 6073556e8..c1fef933c 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.YarnQueue;
 import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
@@ -31,7 +31,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The unit test class for {@link FlinkClusterService}. */
-class FlinkClusterServiceTest extends SpringTestBase {
+class FlinkClusterServiceTest extends SpringUnitTestBase {
 
   @Autowired private FlinkClusterService flinkClusterService;
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
index e6c00ec71..042615368 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service;
 
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 
 import org.apache.hc.core5.http.ContentType;
 
@@ -35,7 +35,7 @@ import java.nio.file.Path;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** org.apache.streampark.console.core.service.ResourceServiceTest. */
-class ResourceServiceTest extends SpringTestBase {
+class ResourceServiceTest extends SpringUnitTestBase {
 
   @Autowired private ResourceService resourceService;
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
index 53133143b..93968dd6f 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -21,7 +21,7 @@ import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
 import org.apache.streampark.console.core.entity.Effective;
@@ -46,7 +46,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of 
{@link
  * SavePointService}.
  */
-class SavePointServiceTest extends SpringTestBase {
+class SavePointServiceTest extends SpringUnitTestBase {
 
   @Autowired private SavePointService savePointService;
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
index b3157193e..1c0592463 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service;
 
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.Resource;
@@ -27,16 +27,17 @@ import org.apache.streampark.console.core.enums.UserType;
 import org.apache.streampark.console.system.entity.User;
 import org.apache.streampark.console.system.service.UserService;
 
-import com.baomidou.mybatisplus.extension.toolkit.Db;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.util.Collections;
 import java.util.Map;
 
 /** org.apache.streampark.console.core.service.UserServiceTest. */
-class UserServiceTest extends SpringTestBase {
+@Transactional
+class UserServiceTest extends SpringUnitTestBase {
   @Autowired private UserService userService;
   @Autowired private ApplicationService applicationService;
   @Autowired private ResourceService resourceService;
@@ -50,7 +51,7 @@ class UserServiceTest extends SpringTestBase {
     user.setPassword("test");
     user.setUserType(UserType.USER);
     user.setStatus(User.STATUS_VALID);
-    Db.save(user);
+    userService.createUser(user);
     // lock user
     user.setStatus(User.STATUS_LOCK);
     Map<String, Object> data =
@@ -74,7 +75,7 @@ class UserServiceTest extends SpringTestBase {
     resource.setEngineType(EngineType.FLINK);
     resource.setTeamId(1L);
     resource.setCreatorId(user.getUserId());
-    Db.save(resource);
+    resourceService.save(resource);
     // lock user when has resource
     user.setStatus(User.STATUS_LOCK);
     Map<String, Object> data2 =
@@ -93,7 +94,7 @@ class UserServiceTest extends SpringTestBase {
     user.setPassword("test");
     user.setUserType(UserType.USER);
     user.setStatus(User.STATUS_VALID);
-    Db.save(user);
+    userService.save(user);
 
     Resource resource = new Resource();
     resource.setResourceName("test");
@@ -101,12 +102,12 @@ class UserServiceTest extends SpringTestBase {
     resource.setEngineType(EngineType.FLINK);
     resource.setTeamId(1L);
     resource.setCreatorId(user.getUserId());
-    Db.save(resource);
+    resourceService.save(resource);
 
     Application app = new Application();
     app.setUserId(user.getUserId());
     app.setTeamId(1L);
-    Db.save(app);
+    applicationService.save(app);
 
     User targetUser = new User();
     targetUser.setUsername("test0");
@@ -114,7 +115,7 @@ class UserServiceTest extends SpringTestBase {
     targetUser.setPassword("test0");
     targetUser.setUserType(UserType.USER);
     targetUser.setStatus(User.STATUS_VALID);
-    Db.save(targetUser);
+    userService.save(targetUser);
 
     Assertions.assertTrue(applicationService.existsByUserId(user.getUserId()));
     Assertions.assertTrue(resourceService.existsByUserId(user.getUserId()));
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
index d1c0436d4..8ac985948 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service;
 
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.core.entity.Variable;
 
 import org.junit.jupiter.api.Assertions;
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /** org.apache.streampark.console.core.service.VariableServiceTest */
-class VariableServiceTest extends SpringTestBase {
+class VariableServiceTest extends SpringUnitTestBase {
 
   @Autowired private VariableService variableService;
 
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
index 317ff4b2a..9a91d7b68 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.core.bean.ResponseResult;
@@ -47,7 +47,7 @@ import static 
org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
  * avoid noisy data form h2 database.
  */
 @Execution(SAME_THREAD)
-class YarnQueueServiceTest extends SpringTestBase {
+class YarnQueueServiceTest extends SpringUnitTestBase {
 
   @Autowired private FlinkClusterService flinkClusterService;
 
@@ -91,7 +91,7 @@ class YarnQueueServiceTest extends SpringTestBase {
             yarnQueues.getRecords().stream()
                 .map(YarnQueue::getQueueLabel)
                 .collect(Collectors.toList()))
-        .containsExactly(q3AtL3, q3AtL1);
+        .containsExactlyInAnyOrder(q3AtL3, q3AtL1);
 
     // Test for 1st page, size = 2, order by create time with queue_label
     queryParams.setQueueLabel("q3");
@@ -101,7 +101,7 @@ class YarnQueueServiceTest extends SpringTestBase {
             yarnQueuesWithQueueLabelLikeQuery.getRecords().stream()
                 .map(YarnQueue::getQueueLabel)
                 .collect(Collectors.toList()))
-        .containsExactly(q3AtL3, q3AtL1);
+        .containsExactlyInAnyOrder(q3AtL3, q3AtL1);
   }
 
   @Test
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
index 4acc63e8b..e3501841f 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.system.authentication;
 
 import org.apache.streampark.common.util.DateUtils;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
 import org.apache.streampark.console.system.entity.AccessToken;
 
 import com.auth0.jwt.JWT;
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Date;
 import java.util.TimeZone;
 
-class JWTTest extends SpringTestBase {
+class JWTTest extends SpringUnitTestBase {
 
   @Test
   void testExpireTime() {
diff --git 
a/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
 
b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
new file mode 100644
index 000000000..aa6f18449
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+logging:
+  level:
+    root: info
+
+spring:
+  datasource:
+    driver-class-name: org.h2.Driver
+    url: 
jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript
 from 'classpath:db/schema-h2.sql'
+    username: sa
+    password: sa
+  sql:
+    init:
+      data-locations: classpath:db/data-h2.sql
+      continue-on-error: true
+      username: sa
+      password: sa
+      mode: always
+server:
+  port: 6666
+
+streampark:
+  workspace:
+    local: /tmp
+    # remote: hdfs://hdfscluster/streampark
+  # hadoop-user-name: root
diff --git a/streampark-test-utils/pom.xml b/streampark-test-utils/pom.xml
new file mode 100644
index 000000000..90c3b252f
--- /dev/null
+++ b/streampark-test-utils/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-test-utils</artifactId>
+    <packaging>pom</packaging>
+
+    <name>StreamPark : Test Utils</name>
+    <url>http://maven.apache.org</url>
+    <modules>
+        <module>streampark-testcontainer</module>
+    </modules>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+</project>
diff --git a/streampark-test-utils/streampark-testcontainer/pom.xml 
b/streampark-test-utils/streampark-testcontainer/pom.xml
new file mode 100644
index 000000000..f666932d3
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-test-utils</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-testcontainer</artifactId>
+    <packaging>jar</packaging>
+
+    <name>StreamPark : Test Container</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!--<kotlin-reflect.version>1.6.21</kotlin-reflect.version>-->
+        <findbugs.version>3.0.2</findbugs.version>
+        <testcontainer.version>1.16.2</testcontainer.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers-bom</artifactId>
+                <version>${testcontainer.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents.client5</groupId>
+            <artifactId>httpclient5</artifactId>
+            <version>${httpclient5.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!--<dependency>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-reflect</artifactId>
+            <version>${kotlin-reflect.version}</version>
+        </dependency>-->
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>${jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-common_2.12</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>${findbugs.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
new file mode 100644
index 000000000..13a62e01f
--- /dev/null
+++ 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import javax.annotation.Nonnull;
+
+enum FlinkComponent {
+  JOBMANAGER("jobmanager"),
+  TASKMANAGER("taskmanager");
+
+  private final String name;
+
+  FlinkComponent(@Nonnull String name) {
+    this.name = name;
+  }
+
+  @Nonnull
+  public String getName() {
+    return name;
+  }
+}
diff --git 
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
new file mode 100644
index 000000000..d9814f0d2
--- /dev/null
+++ 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER;
+import static 
org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER;
+
+class FlinkContainer extends GenericContainer<FlinkContainer> {
+
+  public static final AtomicInteger TM_COUNT = new AtomicInteger(0);
+
+  public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES";
+
+  private final @Nonnull FlinkComponent component;
+
+  FlinkContainer(
+      @Nonnull DockerImageName dockerImageName,
+      @Nonnull FlinkComponent component,
+      @Nonnull Network network,
+      @Nonnull String yamlPropStr,
+      @Nullable Slf4jLogConsumer slf4jLogConsumer) {
+    super(dockerImageName);
+    this.component = component;
+    this.withCommand("/docker-entrypoint.sh", component.getName());
+    this.withCreateContainerCmdModifier(
+        createContainerCmd -> 
createContainerCmd.withName(getFlinkContainerName()));
+    this.withNetwork(network);
+    this.withEnv(FLINK_PROPS_KEY, yamlPropStr);
+    Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer);
+  }
+
+  protected String getFlinkContainerName() {
+    if (component == JOBMANAGER) {
+      return JOBMANAGER.getName();
+    }
+    return String.format("%s_%s", TASKMANAGER.getName(), 
TM_COUNT.incrementAndGet());
+  }
+}
diff --git 
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
new file mode 100644
index 000000000..e5f01dea6
--- /dev/null
+++ 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.apache.streampark.common.util.Utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startable;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER;
+import static 
org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER;
+
+/**
+ * Class to start a couple of flink 1-jobmanager & n-taskmanagers. The 
priority of flinkYamlConfStr
+ * is the highest. But: The 'jobmanager.rpc.address' is always 'jobmanager'. 
The 'rest.port' always
+ * is 8081.
+ */
+public class FlinkStandaloneSessionCluster implements Startable {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(FlinkStandaloneSessionCluster.class);
+
+  public static final Network NETWORK = Network.newNetwork();
+
+  public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address";
+  public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots";
+  public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", 
SLOT_CONF_KEY);
+
+  public static final int BLOB_SERVER_PORT = 6123;
+  public static final int WEB_PORT = 8081;
+
+  private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, 
JOBMANAGER.getName());
+
+  private final FlinkContainer jobManagerContainer;
+
+  private final List<FlinkContainer> taskManagerContainers = new ArrayList<>();
+
+  private FlinkStandaloneSessionCluster(
+      DockerImageName dockerImageName,
+      int taskManagerNum,
+      int slotsNumPerTm,
+      @Nullable String yamlConfStr,
+      Slf4jLogConsumer slf4jLogConsumer) {
+
+    renderJmRpcConfIfNeeded(yamlConfStr);
+
+    renderSlotNumIfNeeded(slotsNumPerTm);
+
+    // Set for JM.
+    this.jobManagerContainer =
+        new FlinkContainer(
+            dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, 
slf4jLogConsumer);
+    this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT);
+    this.jobManagerContainer.addExposedPort(WEB_PORT);
+
+    this.jobManagerContainer.setWaitStrategy(
+        Wait.forHttp("/config")
+            .forStatusCode(200)
+            .forPort(WEB_PORT)
+            .withStartupTimeout(Duration.ofMinutes(8)));
+
+    // Set for TMs.
+    for (int i = 0; i < taskManagerNum; i++) {
+      FlinkContainer taskManager =
+          new FlinkContainer(
+              dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, 
slf4jLogConsumer);
+      this.taskManagerContainers.add(taskManager);
+    }
+  }
+
+  public String getFlinkJobManagerUrl() {
+    return String.format(
+        "http://%s:%s";, jobManagerContainer.getHost(), 
jobManagerContainer.getMappedPort(WEB_PORT));
+  }
+
+  @Override
+  public void start() {
+    Utils.notNull(jobManagerContainer);
+    jobManagerContainer.start();
+    Utils.notNull(taskManagerContainers);
+    for (FlinkContainer taskManagerContainer : taskManagerContainers) {
+      taskManagerContainer.start();
+    }
+  }
+
+  @Override
+  public void stop() {
+    Utils.notNull(taskManagerContainers);
+    for (FlinkContainer taskManagerContainer : taskManagerContainers) {
+      taskManagerContainer.stop();
+    }
+    Utils.notNull(jobManagerContainer);
+    jobManagerContainer.stop();
+  }
+
+  private void renderSlotNumIfNeeded(int slotsNumPerTm) {
+    if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) {
+      this.yamlConfStr =
+          String.format(
+              "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, 
slotsNumPerTm));
+    }
+  }
+
+  private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) {
+    this.yamlConfStr =
+        yamlConfStr == null
+            ? this.yamlConfStr
+            : (yamlConfStr.contains(JM_RPC_ADDR_KEY)
+                ? yamlConfStr
+                : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr));
+  }
+
+  public static class Builder {
+
+    private DockerImageName dockerImageName =
+        DockerImageName.parse("apache/flink:1.17.1-scala_2.12");
+    private int taskManagerNum = 1;
+    private int slotsNumPerTm = 8;
+    private @Nullable String yamlConfStr;
+    private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, 
false);
+
+    private Builder() {}
+
+    public Builder dockerImageName(DockerImageName dockerImageName) {
+      this.dockerImageName = dockerImageName;
+      return this;
+    }
+
+    public Builder taskManagerNum(int taskManagerNum) {
+      Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than 
-1.");
+      this.taskManagerNum = taskManagerNum;
+      return this;
+    }
+
+    public Builder slotsNumPerTm(int slotsNumPerTm) {
+      Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 
0.");
+      this.slotsNumPerTm = slotsNumPerTm;
+      return this;
+    }
+
+    public Builder yamlConfStr(@Nullable String yamlConfStr) {
+      this.yamlConfStr = yamlConfStr;
+      return this;
+    }
+
+    public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) {
+      this.slf4jLogConsumer = slf4jLogConsumer;
+      return this;
+    }
+
+    public FlinkStandaloneSessionCluster build() {
+      return new FlinkStandaloneSessionCluster(
+          dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, 
slf4jLogConsumer);
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git 
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
new file mode 100644
index 000000000..93e34cd43
--- /dev/null
+++ 
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.hadoop;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Hadoop Container for integration test. Note: It's experimental now. */
+public class HadoopContainer extends GenericContainer<HadoopContainer> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(HadoopContainer.class);
+
+  // Hadoop version is 2.7.0
+  public static final DockerImageName DOCKER_IMAGE_NAME =
+      DockerImageName.parse("sequenceiq/hadoop-docker:latest");
+
+  public static final Map<Integer, Integer> MAPPED_PORTS =
+      new HashMap<Integer, Integer>() {
+        {
+          put(50070, 50070);
+          put(8088, 8088);
+          put(9000, 9000);
+        }
+      };
+
+  public HadoopContainer() {
+    this(DOCKER_IMAGE_NAME);
+  }
+
+  public HadoopContainer(@Nonnull DockerImageName dockerImageName) {
+    super(dockerImageName);
+    MAPPED_PORTS.forEach(this::addFixedExposedPort);
+    this.setPrivilegedMode(true);
+    this.withCreateContainerCmdModifier(
+        createContainerCmd -> 
createContainerCmd.withName("one-container-hadoop-cluster"));
+    WaitAllStrategy waitAllStrategy =
+        new WaitAllStrategy()
+            .withStrategy(
+                Wait.forHttp("/ws/v1/cluster/info")
+                    .forPort(8088)
+                    .forStatusCode(200)
+                    .withStartupTimeout(Duration.ofMinutes(8)))
+            .withStrategy(
+                Wait.forHttp("")
+                    .forPort(50070)
+                    .forStatusCode(200)
+                    .withStartupTimeout(Duration.ofMinutes(8)))
+            
.withStrategy(Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(8)))
+            .withStartupTimeout(Duration.ofMinutes(8));
+    this.waitingFor(waitAllStrategy);
+    this.withLogConsumer(new Slf4jLogConsumer(LOG));
+    this.withCommand("/etc/bootstrap.sh", "-d");
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    this.waitUntilContainerStarted();
+  }
+}
diff --git 
a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
 
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
new file mode 100644
index 000000000..6f34e720f
--- /dev/null
+++ 
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for flink standalone session cluster env available. */
+class FlinkStandaloneSessionClusterITest {
+
+  private final FlinkStandaloneSessionCluster cluster =
+      FlinkStandaloneSessionCluster.builder().build();
+
+  @BeforeEach
+  void up() {
+    cluster.start();
+  }
+
+  @AfterEach
+  void down() {
+    cluster.stop();
+  }
+
+  @Test
+  void testRestApiAvailable() throws IOException {
+    String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), 
"config");
+    CloseableHttpClient httpClient = HttpClients.createDefault();
+    CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
+    assertThat(response.getCode()).isEqualTo(200);
+  }
+}
diff --git 
a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
 
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
new file mode 100644
index 000000000..961d76c43
--- /dev/null
+++ 
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.hadoop;
+
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HadoopContainer}. */
+class HadoopContainerTest {
+  static final HadoopContainer HADOOP_CONTAINER = new HadoopContainer();
+
+  @BeforeAll
+  static void setup() {
+    HADOOP_CONTAINER.start();
+  }
+
+  @AfterAll
+  static void teardown() {
+    HADOOP_CONTAINER.stop();
+  }
+
+  @Test
+  @Timeout(value = 8, unit = TimeUnit.MINUTES)
+  void testOverview() throws IOException {
+
+    String url = String.format("http://%s:%s/ws/v1/cluster/info";, 
HADOOP_CONTAINER.getHost(), 8088);
+    CloseableHttpClient httpClient = HttpClients.createDefault();
+    CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
+    assertThat(response.getCode()).isEqualTo(200);
+
+    url = String.format("http://%s:%s";, HADOOP_CONTAINER.getHost(), 50070);
+    httpClient = HttpClients.createDefault();
+    response = httpClient.execute(new HttpGet(url));
+    assertThat(response.getCode()).isEqualTo(200);
+  }
+}

Reply via email to