This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 196e7f722 Add test-container utils for integration test. (#3002)
196e7f722 is described below
commit 196e7f7222019f0e9fc0a783e8ba7794cdafcc74
Author: Yuepeng Pan <[email protected]>
AuthorDate: Wed Aug 30 21:59:09 2023 +0800
Add test-container utils for integration test. (#3002)
---
.github/workflows/maven.yml | 4 +-
pom.xml | 1 +
.../streampark-console-service/pom.xml | 10 +-
.../src/main/assembly/assembly.xml | 1 +
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 5 +
.../console/core/task/FlinkHttpWatcher.java | 11 +-
.../console/SpringIntegrationTestBase.java | 123 ++++++++++++++
...SpringTestBase.java => SpringUnitTestBase.java} | 10 +-
.../core/service/AccessTokenServiceTest.java | 4 +-
.../core/service/ApplicationServiceITest.java | 153 +++++++++++++++++
.../core/service/ApplicationServiceTest.java | 6 +-
.../core/service/FlinkClusterServiceTest.java | 4 +-
.../console/core/service/ResourceServiceTest.java | 4 +-
.../console/core/service/SavePointServiceTest.java | 4 +-
.../console/core/service/UserServiceTest.java | 19 ++-
.../console/core/service/VariableServiceTest.java | 4 +-
.../console/core/service/YarnQueueServiceTest.java | 8 +-
.../console/system/authentication/JWTTest.java | 4 +-
.../resources/application-integration-test.yml | 41 +++++
streampark-test-utils/pom.xml | 40 +++++
.../streampark-testcontainer/pom.xml | 105 ++++++++++++
.../testcontainer/flink/FlinkComponent.java | 36 ++++
.../testcontainer/flink/FlinkContainer.java | 64 +++++++
.../flink/FlinkStandaloneSessionCluster.java | 185 +++++++++++++++++++++
.../testcontainer/hadoop/HadoopContainer.java | 86 ++++++++++
.../flink/FlinkStandaloneSessionClusterITest.java | 56 +++++++
.../testcontainer/hadoop/HadoopContainerTest.java | 63 +++++++
28 files changed, 1015 insertions(+), 38 deletions(-)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 23b6cfce5..6c7432de6 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -88,7 +88,7 @@ jobs:
distribution: "adopt"
cache: "maven"
- name: Build with Maven
- run: ./mvnw -B clean install -Pshaded -DskipTests
+ run: ./mvnw -B clean install -Pshaded,dist -DskipTests
- name: Test with Maven
- run: ./mvnw -B test
+ run: wget -c
https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz -P
/tmp/ && tar -zxvf /tmp/flink-1.17.1-bin-scala_2.12.tgz && ./mvnw -B test
diff --git a/pom.xml b/pom.xml
index 624783ff5..fef4ec416 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@
<module>streampark-common</module>
<module>streampark-flink</module>
<module>streampark-console</module>
+ <module>streampark-test-utils</module>
</modules>
<properties>
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index 4202d7c96..28d0c11ae 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -380,7 +380,6 @@
<version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
@@ -412,6 +411,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-testcontainer</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!--Test dependencies end.-->
<!--log4j -->
@@ -447,7 +453,7 @@
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
</dependency>
-
+
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
index 92c73dac4..aa4677c4a 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
+++
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
@@ -17,6 +17,7 @@
<assembly>
<id>bin</id>
<formats>
+ <format>dir</format>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index a94879640..99a28a87c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1676,7 +1676,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
private Map<String, Object> getProperties(Application application) {
- Map<String, Object> properties = application.getOptionMap();
+ Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 3f6f3dd55..636caac80 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -139,6 +139,11 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Override
public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
+ return internalCreate(flinkCluster);
+ }
+
+ @VisibleForTesting
+ public boolean internalCreate(FlinkCluster flinkCluster) {
boolean successful = validateQueueIfNeeded(flinkCluster);
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT,
flinkCluster.getYarnQueue()));
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 26302387a..ce271da59 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -38,6 +38,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -49,6 +50,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -84,7 +87,7 @@ public class FlinkHttpWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
// track interval every 5 seconds
- private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+ public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
// option interval within 10 seconds
private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
@@ -197,6 +200,12 @@ public class FlinkHttpWatcher {
}
}
+ @VisibleForTesting
+ public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
+ Application app = WATCHING_APPS.get(appId);
+ return (app == null || app.getState() == null) ? null :
FlinkAppState.of(app.getState());
+ }
+
private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
new file mode 100644
index 000000000..8780e221e
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console;
+
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.SystemPropertyUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import
org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
+import
org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Integration base tester. Note: The all children classes of the base must
run after the
+ * project-level package phrase.
+ */
+@Slf4j
+@EnableScheduling
+@ActiveProfiles("integration-test")
+@AutoConfigureTestEntityManager
+@AutoConfigureWebTestClient(timeout = "60000")
+@TestPropertySource(locations = {"classpath:application-integration-test.yml"})
+@ExtendWith({MockitoExtension.class, SpringExtension.class})
+@SpringBootTest(
+ classes = StreamParkConsoleBootstrap.class,
+ webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
+public abstract class SpringIntegrationTestBase {
+ protected static final Logger LOG =
LoggerFactory.getLogger(SpringIntegrationTestBase.class);
+
+ protected static final String RUN_PKG_SCRIPT_HINT =
+ "Please run package script before running the test case.";
+
+ protected static final String DEFAULT_APP_HOME_DIR_NAME =
"apache-streampark";
+ protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME =
"localWorkspace";
+ protected static final String DEFAULT_FLINK_VERSION = "1.17.1";
+ protected static final FileFilter PKG_NAME_FILTER =
+ file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) &&
file.isDirectory();
+ protected static String defaultFlinkHome = "/tmp/flink-1.17.1";
+ protected static String appHome;
+
+ @BeforeAll
+ public static void init(@TempDir File tempPath) throws IOException {
+
+ LOG.info("Start prepare the real running env.");
+ String tempAbsPath = tempPath.getAbsolutePath();
+ LOG.info("Integration test base tmp dir: {}", tempAbsPath);
+
+ FileUtils.copyDirectory(
+ tryFindStreamParkPackagedDirFile(), new File(tempAbsPath,
DEFAULT_APP_HOME_DIR_NAME));
+
+ Path localWorkspace =
+ Files.createDirectories(new File(tempAbsPath,
DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());
+
+ appHome = new File(tempAbsPath,
DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
+ System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
+ System.setProperty(
+ CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
+ localWorkspace.toAbsolutePath().toString());
+
+ LOG.info(
+ "Complete mock EnvInitializer init, app home: {}, {}: {}",
+ appHome,
+ CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
+ localWorkspace.toAbsolutePath());
+ }
+
+ private static File tryFindStreamParkPackagedDirFile() {
+ String userDir =
Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
+ File pkgTargetDirFile = new File(userDir, "target");
+ Preconditions.checkState(
+ pkgTargetDirFile.exists(),
+ "The target directory of %s doesn't exist. %s",
+ userDir,
+ RUN_PKG_SCRIPT_HINT);
+ Optional<File> availablePkgParentFileOpt =
+
Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
+ final File availablePkgParentFile =
+ availablePkgParentFileOpt.orElseThrow(() -> new
RuntimeException(RUN_PKG_SCRIPT_HINT));
+ Optional<File> targetDirFile =
+
Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER)))
+ .findFirst();
+ return targetDirFile.orElseThrow(() -> new
RuntimeException(RUN_PKG_SCRIPT_HINT));
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
similarity index 95%
rename from
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
rename to
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
index 417a117ce..490df51c0 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
@@ -24,6 +24,7 @@ import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@@ -33,10 +34,10 @@ import org.slf4j.LoggerFactory;
import
org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import
org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
@@ -44,7 +45,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
/** base tester. */
-@Transactional
+@Slf4j
+@EnableScheduling
@ActiveProfiles("test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
@@ -53,9 +55,9 @@ import java.nio.file.Path;
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
-public abstract class SpringTestBase {
+public abstract class SpringUnitTestBase {
- protected static final Logger LOG =
LoggerFactory.getLogger(SpringTestBase.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(SpringUnitTestBase.class);
@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
index f97823ce5..f20d7fb80 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.util.WebUtils;
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-public class AccessTokenServiceTest extends SpringTestBase {
+public class AccessTokenServiceTest extends SpringUnitTestBase {
@Autowired private AccessTokenService accessTokenService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
new file mode 100644
index 000000000..812f5da37
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.SpringIntegrationTestBase;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.enums.FlinkAppState;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.Base64;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for {@link
+ * org.apache.streampark.console.core.service.impl.ApplicationServiceImpl}.
+ */
+class ApplicationServiceITest extends SpringIntegrationTestBase {
+
+ static FlinkStandaloneSessionCluster cluster =
+
FlinkStandaloneSessionCluster.builder().slotsNumPerTm(4).slf4jLogConsumer(null).build();
+
+ @Autowired private ApplicationService appService;
+
+ @Autowired private FlinkClusterService clusterService;
+
+ @Autowired private FlinkEnvService envService;
+
+ @Autowired private AppBuildPipeService appBuildPipeService;
+
+ @Autowired private FlinkSqlService sqlService;
+
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+
+ @BeforeAll
+ static void setup() {
+ cluster.start();
+ }
+
+ @AfterAll
+ static void teardown() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ void clear() {
+ appService.getBaseMapper().delete(new QueryWrapper<>());
+ clusterService.getBaseMapper().delete(new QueryWrapper<>());
+ envService.getBaseMapper().delete(new QueryWrapper<>());
+ appBuildPipeService.getBaseMapper().delete(new QueryWrapper<>());
+ sqlService.getBaseMapper().delete(new QueryWrapper<>());
+ }
+
+ @Test
+ @Timeout(value = 180)
+ void testStartAppOnRemoteSessionMode() throws Exception {
+ FlinkEnv flinkEnv = new FlinkEnv();
+ flinkEnv.setFlinkHome(defaultFlinkHome);
+ flinkEnv.setFlinkName(DEFAULT_FLINK_VERSION);
+ flinkEnv.setId(1L);
+ envService.create(flinkEnv);
+ FlinkCluster flinkCluster = new FlinkCluster();
+ flinkCluster.setId(1L);
+ flinkCluster.setAddress(cluster.getFlinkJobManagerUrl());
+ flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode());
+ flinkCluster.setClusterName("docker-Cluster-1.17.1");
+ flinkCluster.setVersionId(1L);
+ flinkCluster.setUserId(100000L);
+ ((FlinkClusterServiceImpl) clusterService).internalCreate(flinkCluster);
+ Application appParam = new Application();
+ appParam.setId(100000L);
+ appParam.setTeamId(100000L);
+ Application application = appService.getApp(appParam);
+ application.setFlinkClusterId(1L);
+ application.setSqlId(100000L);
+ application.setVersionId(1L);
+ application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+
+ // Avoid exceptional error.
+ application.setFlinkSql(
+ new
String(Base64.getDecoder().decode(application.getFlinkSql().getBytes())));
+ FlinkSql flinkSql = sqlService.getEffective(application.getId(), false);
+ flinkSql.setSql(DeflaterUtils.zipString(flinkSql.getSql()));
+ sqlService.getBaseMapper().updateById(flinkSql);
+
+ // Continue operations link.
+ appService.update(application);
+ appBuildPipeService.buildApplication(100000L, false);
+
+ CompletableFuture<Boolean> buildCompletableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ while (true) {
+ Application app = appService.getById(100000L);
+ if (app != null && app.getReleaseState() == ReleaseState.DONE)
{
+ break;
+ }
+ }
+ return true;
+ });
+ buildCompletableFuture.get();
+
+ appService.start(appService.getById(100000L), false);
+ CompletableFuture<Boolean> completableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ while (true) {
+ if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId())
+ == FlinkAppState.RUNNING) {
+ break;
+ }
+ }
+ return true;
+ });
+
+ assertThat(completableFuture.get(WATCHING_INTERVAL.toMillis() * 4,
TimeUnit.MILLISECONDS))
+ .isEqualTo(true);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
index ea5c34560..f6ce39a54 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.YarnQueue;
import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl;
@@ -34,8 +34,8 @@ import java.util.Date;
import static org.assertj.core.api.Assertions.assertThat;
-/** org.apache.streampark.console.core.service.ApplicationServiceTest. */
-class ApplicationServiceTest extends SpringTestBase {
+/** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */
+class ApplicationServiceTest extends SpringUnitTestBase {
@Autowired private ApplicationService applicationService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
index 6073556e8..c1fef933c 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;
import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
@@ -31,7 +31,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;
/** The unit test class for {@link FlinkClusterService}. */
-class FlinkClusterServiceTest extends SpringTestBase {
+class FlinkClusterServiceTest extends SpringUnitTestBase {
@Autowired private FlinkClusterService flinkClusterService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
index e6c00ec71..042615368 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.hc.core5.http.ContentType;
@@ -35,7 +35,7 @@ import java.nio.file.Path;
import static org.assertj.core.api.Assertions.assertThat;
/** org.apache.streampark.console.core.service.ResourceServiceTest. */
-class ResourceServiceTest extends SpringTestBase {
+class ResourceServiceTest extends SpringUnitTestBase {
@Autowired private ResourceService resourceService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
index 53133143b..93968dd6f 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -21,7 +21,7 @@ import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.Effective;
@@ -46,7 +46,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
* org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of
{@link
* SavePointService}.
*/
-class SavePointServiceTest extends SpringTestBase {
+class SavePointServiceTest extends SpringUnitTestBase {
@Autowired private SavePointService savePointService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
index b3157193e..1c0592463 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Resource;
@@ -27,16 +27,17 @@ import org.apache.streampark.console.core.enums.UserType;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.UserService;
-import com.baomidou.mybatisplus.extension.toolkit.Db;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.Map;
/** org.apache.streampark.console.core.service.UserServiceTest. */
-class UserServiceTest extends SpringTestBase {
+@Transactional
+class UserServiceTest extends SpringUnitTestBase {
@Autowired private UserService userService;
@Autowired private ApplicationService applicationService;
@Autowired private ResourceService resourceService;
@@ -50,7 +51,7 @@ class UserServiceTest extends SpringTestBase {
user.setPassword("test");
user.setUserType(UserType.USER);
user.setStatus(User.STATUS_VALID);
- Db.save(user);
+ userService.createUser(user);
// lock user
user.setStatus(User.STATUS_LOCK);
Map<String, Object> data =
@@ -74,7 +75,7 @@ class UserServiceTest extends SpringTestBase {
resource.setEngineType(EngineType.FLINK);
resource.setTeamId(1L);
resource.setCreatorId(user.getUserId());
- Db.save(resource);
+ resourceService.save(resource);
// lock user when has resource
user.setStatus(User.STATUS_LOCK);
Map<String, Object> data2 =
@@ -93,7 +94,7 @@ class UserServiceTest extends SpringTestBase {
user.setPassword("test");
user.setUserType(UserType.USER);
user.setStatus(User.STATUS_VALID);
- Db.save(user);
+ userService.save(user);
Resource resource = new Resource();
resource.setResourceName("test");
@@ -101,12 +102,12 @@ class UserServiceTest extends SpringTestBase {
resource.setEngineType(EngineType.FLINK);
resource.setTeamId(1L);
resource.setCreatorId(user.getUserId());
- Db.save(resource);
+ resourceService.save(resource);
Application app = new Application();
app.setUserId(user.getUserId());
app.setTeamId(1L);
- Db.save(app);
+ applicationService.save(app);
User targetUser = new User();
targetUser.setUsername("test0");
@@ -114,7 +115,7 @@ class UserServiceTest extends SpringTestBase {
targetUser.setPassword("test0");
targetUser.setUserType(UserType.USER);
targetUser.setStatus(User.STATUS_VALID);
- Db.save(targetUser);
+ userService.save(targetUser);
Assertions.assertTrue(applicationService.existsByUserId(user.getUserId()));
Assertions.assertTrue(resourceService.existsByUserId(user.getUserId()));
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
index d1c0436d4..8ac985948 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.Variable;
import org.junit.jupiter.api.Assertions;
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
/** org.apache.streampark.console.core.service.VariableServiceTest */
-class VariableServiceTest extends SpringTestBase {
+class VariableServiceTest extends SpringUnitTestBase {
@Autowired private VariableService variableService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
index 317ff4b2a..9a91d7b68 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.bean.ResponseResult;
@@ -47,7 +47,7 @@ import static
org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
* avoid noisy data form h2 database.
*/
@Execution(SAME_THREAD)
-class YarnQueueServiceTest extends SpringTestBase {
+class YarnQueueServiceTest extends SpringUnitTestBase {
@Autowired private FlinkClusterService flinkClusterService;
@@ -91,7 +91,7 @@ class YarnQueueServiceTest extends SpringTestBase {
yarnQueues.getRecords().stream()
.map(YarnQueue::getQueueLabel)
.collect(Collectors.toList()))
- .containsExactly(q3AtL3, q3AtL1);
+ .containsExactlyInAnyOrder(q3AtL3, q3AtL1);
// Test for 1st page, size = 2, order by create time with queue_label
queryParams.setQueueLabel("q3");
@@ -101,7 +101,7 @@ class YarnQueueServiceTest extends SpringTestBase {
yarnQueuesWithQueueLabelLikeQuery.getRecords().stream()
.map(YarnQueue::getQueueLabel)
.collect(Collectors.toList()))
- .containsExactly(q3AtL3, q3AtL1);
+ .containsExactlyInAnyOrder(q3AtL3, q3AtL1);
}
@Test
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
index 4acc63e8b..e3501841f 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.system.authentication;
import org.apache.streampark.common.util.DateUtils;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.system.entity.AccessToken;
import com.auth0.jwt.JWT;
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.TimeZone;
-class JWTTest extends SpringTestBase {
+class JWTTest extends SpringUnitTestBase {
@Test
void testExpireTime() {
diff --git
a/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
new file mode 100644
index 000000000..aa6f18449
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+logging:
+ level:
+ root: info
+
+spring:
+ datasource:
+ driver-class-name: org.h2.Driver
+ url:
jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript
from 'classpath:db/schema-h2.sql'
+ username: sa
+ password: sa
+ sql:
+ init:
+ data-locations: classpath:db/data-h2.sql
+ continue-on-error: true
+ username: sa
+ password: sa
+ mode: always
+server:
+ port: 6666
+
+streampark:
+ workspace:
+ local: /tmp
+ # remote: hdfs://hdfscluster/streampark
+ # hadoop-user-name: root
diff --git a/streampark-test-utils/pom.xml b/streampark-test-utils/pom.xml
new file mode 100644
index 000000000..90c3b252f
--- /dev/null
+++ b/streampark-test-utils/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-test-utils</artifactId>
+ <packaging>pom</packaging>
+
+ <name>StreamPark : Test Utils</name>
+ <url>http://maven.apache.org</url>
+ <modules>
+ <module>streampark-testcontainer</module>
+ </modules>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+</project>
diff --git a/streampark-test-utils/streampark-testcontainer/pom.xml
b/streampark-test-utils/streampark-testcontainer/pom.xml
new file mode 100644
index 000000000..f666932d3
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-test-utils</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-testcontainer</artifactId>
+ <packaging>jar</packaging>
+
+ <name>StreamPark : Test Container</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!--<kotlin-reflect.version>1.6.21</kotlin-reflect.version>-->
+ <findbugs.version>3.0.2</findbugs.version>
+ <testcontainer.version>1.16.2</testcontainer.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers-bom</artifactId>
+ <version>${testcontainer.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents.client5</groupId>
+ <artifactId>httpclient5</artifactId>
+ <version>${httpclient5.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!--<dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-reflect</artifactId>
+ <version>${kotlin-reflect.version}</version>
+ </dependency>-->
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>${jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-common_2.12</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${findbugs.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
new file mode 100644
index 000000000..13a62e01f
--- /dev/null
+++
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import javax.annotation.Nonnull;
+
+enum FlinkComponent {
+ JOBMANAGER("jobmanager"),
+ TASKMANAGER("taskmanager");
+
+ private final String name;
+
+ FlinkComponent(@Nonnull String name) {
+ this.name = name;
+ }
+
+ @Nonnull
+ public String getName() {
+ return name;
+ }
+}
diff --git
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
new file mode 100644
index 000000000..d9814f0d2
--- /dev/null
+++
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER;
+import static
org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER;
+
+class FlinkContainer extends GenericContainer<FlinkContainer> {
+
+ public static final AtomicInteger TM_COUNT = new AtomicInteger(0);
+
+ public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES";
+
+ private final @Nonnull FlinkComponent component;
+
+ FlinkContainer(
+ @Nonnull DockerImageName dockerImageName,
+ @Nonnull FlinkComponent component,
+ @Nonnull Network network,
+ @Nonnull String yamlPropStr,
+ @Nullable Slf4jLogConsumer slf4jLogConsumer) {
+ super(dockerImageName);
+ this.component = component;
+ this.withCommand("/docker-entrypoint.sh", component.getName());
+ this.withCreateContainerCmdModifier(
+ createContainerCmd ->
createContainerCmd.withName(getFlinkContainerName()));
+ this.withNetwork(network);
+ this.withEnv(FLINK_PROPS_KEY, yamlPropStr);
+ Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer);
+ }
+
+ protected String getFlinkContainerName() {
+ if (component == JOBMANAGER) {
+ return JOBMANAGER.getName();
+ }
+ return String.format("%s_%s", TASKMANAGER.getName(),
TM_COUNT.incrementAndGet());
+ }
+}
diff --git
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
new file mode 100644
index 000000000..e5f01dea6
--- /dev/null
+++
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.apache.streampark.common.util.Utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startable;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER;
+import static
org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER;
+
+/**
+ * Class to start a couple of flink 1-jobmanager & n-taskmanagers. The
priority of flinkYamlConfStr
+ * is the highest. But: The 'jobmanager.rpc.address' is always 'jobmanager'.
The 'rest.port' always
+ * is 8081.
+ */
+public class FlinkStandaloneSessionCluster implements Startable {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(FlinkStandaloneSessionCluster.class);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address";
+ public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots";
+ public static final String SLOT_CONF_FORMAT = String.format("%s: %%s",
SLOT_CONF_KEY);
+
+ public static final int BLOB_SERVER_PORT = 6123;
+ public static final int WEB_PORT = 8081;
+
+ private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY,
JOBMANAGER.getName());
+
+ private final FlinkContainer jobManagerContainer;
+
+ private final List<FlinkContainer> taskManagerContainers = new ArrayList<>();
+
+ private FlinkStandaloneSessionCluster(
+ DockerImageName dockerImageName,
+ int taskManagerNum,
+ int slotsNumPerTm,
+ @Nullable String yamlConfStr,
+ Slf4jLogConsumer slf4jLogConsumer) {
+
+ renderJmRpcConfIfNeeded(yamlConfStr);
+
+ renderSlotNumIfNeeded(slotsNumPerTm);
+
+ // Set for JM.
+ this.jobManagerContainer =
+ new FlinkContainer(
+ dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr,
slf4jLogConsumer);
+ this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT);
+ this.jobManagerContainer.addExposedPort(WEB_PORT);
+
+ this.jobManagerContainer.setWaitStrategy(
+ Wait.forHttp("/config")
+ .forStatusCode(200)
+ .forPort(WEB_PORT)
+ .withStartupTimeout(Duration.ofMinutes(8)));
+
+ // Set for TMs.
+ for (int i = 0; i < taskManagerNum; i++) {
+ FlinkContainer taskManager =
+ new FlinkContainer(
+ dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr,
slf4jLogConsumer);
+ this.taskManagerContainers.add(taskManager);
+ }
+ }
+
+ public String getFlinkJobManagerUrl() {
+ return String.format(
+ "http://%s:%s", jobManagerContainer.getHost(),
jobManagerContainer.getMappedPort(WEB_PORT));
+ }
+
+ @Override
+ public void start() {
+ Utils.notNull(jobManagerContainer);
+ jobManagerContainer.start();
+ Utils.notNull(taskManagerContainers);
+ for (FlinkContainer taskManagerContainer : taskManagerContainers) {
+ taskManagerContainer.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ Utils.notNull(taskManagerContainers);
+ for (FlinkContainer taskManagerContainer : taskManagerContainers) {
+ taskManagerContainer.stop();
+ }
+ Utils.notNull(jobManagerContainer);
+ jobManagerContainer.stop();
+ }
+
+ private void renderSlotNumIfNeeded(int slotsNumPerTm) {
+ if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) {
+ this.yamlConfStr =
+ String.format(
+ "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT,
slotsNumPerTm));
+ }
+ }
+
+ private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) {
+ this.yamlConfStr =
+ yamlConfStr == null
+ ? this.yamlConfStr
+ : (yamlConfStr.contains(JM_RPC_ADDR_KEY)
+ ? yamlConfStr
+ : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr));
+ }
+
+ public static class Builder {
+
+ private DockerImageName dockerImageName =
+ DockerImageName.parse("apache/flink:1.17.1-scala_2.12");
+ private int taskManagerNum = 1;
+ private int slotsNumPerTm = 8;
+ private @Nullable String yamlConfStr;
+ private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG,
false);
+
+ private Builder() {}
+
+ public Builder dockerImageName(DockerImageName dockerImageName) {
+ this.dockerImageName = dockerImageName;
+ return this;
+ }
+
+ public Builder taskManagerNum(int taskManagerNum) {
+ Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than
-1.");
+ this.taskManagerNum = taskManagerNum;
+ return this;
+ }
+
+ public Builder slotsNumPerTm(int slotsNumPerTm) {
+ Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than
0.");
+ this.slotsNumPerTm = slotsNumPerTm;
+ return this;
+ }
+
+ public Builder yamlConfStr(@Nullable String yamlConfStr) {
+ this.yamlConfStr = yamlConfStr;
+ return this;
+ }
+
+ public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) {
+ this.slf4jLogConsumer = slf4jLogConsumer;
+ return this;
+ }
+
+ public FlinkStandaloneSessionCluster build() {
+ return new FlinkStandaloneSessionCluster(
+ dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr,
slf4jLogConsumer);
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+}
diff --git
a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
new file mode 100644
index 000000000..93e34cd43
--- /dev/null
+++
b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.hadoop;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Hadoop Container for integration test. Note: It's experimental now. */
+public class HadoopContainer extends GenericContainer<HadoopContainer> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(HadoopContainer.class);
+
+ // Hadoop version is 2.7.0
+ public static final DockerImageName DOCKER_IMAGE_NAME =
+ DockerImageName.parse("sequenceiq/hadoop-docker:latest");
+
+ public static final Map<Integer, Integer> MAPPED_PORTS =
+ new HashMap<Integer, Integer>() {
+ {
+ put(50070, 50070);
+ put(8088, 8088);
+ put(9000, 9000);
+ }
+ };
+
+ public HadoopContainer() {
+ this(DOCKER_IMAGE_NAME);
+ }
+
+ public HadoopContainer(@Nonnull DockerImageName dockerImageName) {
+ super(dockerImageName);
+ MAPPED_PORTS.forEach(this::addFixedExposedPort);
+ this.setPrivilegedMode(true);
+ this.withCreateContainerCmdModifier(
+ createContainerCmd ->
createContainerCmd.withName("one-container-hadoop-cluster"));
+ WaitAllStrategy waitAllStrategy =
+ new WaitAllStrategy()
+ .withStrategy(
+ Wait.forHttp("/ws/v1/cluster/info")
+ .forPort(8088)
+ .forStatusCode(200)
+ .withStartupTimeout(Duration.ofMinutes(8)))
+ .withStrategy(
+ Wait.forHttp("")
+ .forPort(50070)
+ .forStatusCode(200)
+ .withStartupTimeout(Duration.ofMinutes(8)))
+
.withStrategy(Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(8)))
+ .withStartupTimeout(Duration.ofMinutes(8));
+ this.waitingFor(waitAllStrategy);
+ this.withLogConsumer(new Slf4jLogConsumer(LOG));
+ this.withCommand("/etc/bootstrap.sh", "-d");
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ this.waitUntilContainerStarted();
+ }
+}
diff --git
a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
new file mode 100644
index 000000000..6f34e720f
--- /dev/null
+++
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for flink standalone session cluster env available. */
+class FlinkStandaloneSessionClusterITest {
+
+ private final FlinkStandaloneSessionCluster cluster =
+ FlinkStandaloneSessionCluster.builder().build();
+
+ @BeforeEach
+ void up() {
+ cluster.start();
+ }
+
+ @AfterEach
+ void down() {
+ cluster.stop();
+ }
+
+ @Test
+ void testRestApiAvailable() throws IOException {
+ String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(),
"config");
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
+ assertThat(response.getCode()).isEqualTo(200);
+ }
+}
diff --git
a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
new file mode 100644
index 000000000..961d76c43
--- /dev/null
+++
b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.hadoop;
+
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HadoopContainer}. */
+class HadoopContainerTest {
+ static final HadoopContainer HADOOP_CONTAINER = new HadoopContainer();
+
+ @BeforeAll
+ static void setup() {
+ HADOOP_CONTAINER.start();
+ }
+
+ @AfterAll
+ static void teardown() {
+ HADOOP_CONTAINER.stop();
+ }
+
+ @Test
+ @Timeout(value = 8, unit = TimeUnit.MINUTES)
+ void testOverview() throws IOException {
+
+ String url = String.format("http://%s:%s/ws/v1/cluster/info",
HADOOP_CONTAINER.getHost(), 8088);
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
+ assertThat(response.getCode()).isEqualTo(200);
+
+ url = String.format("http://%s:%s", HADOOP_CONTAINER.getHost(), 50070);
+ httpClient = HttpClients.createDefault();
+ response = httpClient.execute(new HttpGet(url));
+ assertThat(response.getCode()).isEqualTo(200);
+ }
+}