This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 3fc85bc0d [ISSUE-3057][Improve] Improve streampark-common module base
on [3.10 Pre-Conditions Checking] (#3420)
3fc85bc0d is described below
commit 3fc85bc0dd5b84db882e9af6d0fd9e4171a0336c
Author: zhengkezhou <[email protected]>
AuthorDate: Fri Dec 22 22:57:19 2023 +0800
[ISSUE-3057][Improve] Improve streampark-common module base on [3.10
Pre-Conditions Checking] (#3420)
* [Improve] rename notNull method in Utils.scala
---
.../scala/org/apache/streampark/common/util/Logger.scala | 2 +-
.../scala/org/apache/streampark/common/util/Utils.scala | 6 +++---
.../console/core/controller/ExternalLinkController.java | 2 +-
.../streampark/console/core/runner/EnvInitializer.java | 2 +-
.../application/impl/ApplicationActionServiceImpl.java | 6 +++---
.../console/core/service/impl/AppBuildPipeServiceImpl.java | 2 +-
.../console/core/service/impl/ExternalLinkServiceImpl.java | 2 +-
.../console/core/service/impl/FlinkSqlServiceImpl.java | 4 ++--
.../console/core/service/impl/ProjectServiceImpl.java | 8 ++++----
.../console/core/service/impl/SavePointServiceImpl.java | 10 +++++-----
.../console/core/service/impl/SqlCompleteServiceImpl.java | 2 +-
.../console/core/service/impl/YarnQueueServiceImpl.java | 13 +++++++------
.../console/system/service/impl/UserServiceImpl.java | 6 +++---
.../testcontainer/flink/FlinkStandaloneSessionCluster.java | 8 ++++----
14 files changed, 37 insertions(+), 36 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index f17465e6b..faf1fc97c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -134,7 +134,7 @@ private[this] object LoggerFactory extends
LoggerFactoryBinder {
val shadedPackage = "org.apache.streampark.shaded"
override def configureByResource(url: URL): Unit = {
- Utils.notNull(url, "URL argument cannot be null")
+ Utils.requireNotNull(url, "URL argument cannot be null")
val path = url.getPath
if (path.endsWith("xml")) {
val configurator = new JoranConfigurator()
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 30d001ae3..f8e3a21e6 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -33,14 +33,14 @@ object Utils extends Logger {
private[this] lazy val OS = System.getProperty("os.name").toLowerCase
- def notNull(obj: Any, message: String): Unit = {
+ def requireNotNull(obj: Any, message: String): Unit = {
if (obj == null) {
throw new NullPointerException(message)
}
}
- def notNull(obj: Any): Unit = {
- notNull(obj, "this argument must not be null")
+ def requireNotNull(obj: Any): Unit = {
+ requireNotNull(obj, "this argument must not be null")
}
def requireNotEmpty(elem: Any): Boolean = {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
index a1c226077..5af4fc068 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
@@ -82,7 +82,7 @@ public class ExternalLinkController {
@PostMapping("/update")
@RequiresPermissions("externalLink:update")
public RestResponse update(@Valid ExternalLink externalLink) {
- Utils.notNull(externalLink.getId(), "The link id cannot be null");
+ Utils.requireNotNull(externalLink.getId(), "The link id cannot be null");
externalLinkService.update(externalLink);
return RestResponse.success();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 294e7952b..5026ee336 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -113,7 +113,7 @@ public class EnvInitializer implements ApplicationRunner {
.forEach(
key -> {
InternalOption config = InternalConfigHolder.getConfig(key);
- Utils.notNull(config);
+ Utils.requireNotNull(config);
InternalConfigHolder.set(config, springEnv.getProperty(key,
config.classType()));
});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 401eb67d4..8f22ca5f4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -385,7 +385,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
public void start(Application appParam, boolean auto) throws Exception {
// 1) check application
final Application application = getById(appParam.getId());
- Utils.notNull(application);
+ Utils.requireNotNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be
started repeatedly.");
@@ -397,7 +397,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
- Utils.notNull(buildPipeline);
+ Utils.requireNotNull(buildPipeline);
FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(application.getVersionId());
@@ -625,7 +625,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
switch (application.getDevelopmentMode()) {
case FLINK_SQL:
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
false);
- Utils.notNull(flinkSql);
+ Utils.requireNotNull(flinkSql);
// 1) dist_userJar
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
// 2) appConfig
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index db827a626..36111230e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -206,7 +206,7 @@ public class AppBuildPipeServiceImpl
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(),
false);
if (app.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql :
newFlinkSql;
- Utils.notNull(flinkSql);
+ Utils.requireNotNull(flinkSql);
app.setDependency(flinkSql.getDependency());
app.setTeamResource(flinkSql.getTeamResource());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 227dab28c..1cc365c0a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -76,7 +76,7 @@ public class ExternalLinkServiceImpl extends
ServiceImpl<ExternalLinkMapper, Ext
@Override
public List<ExternalLink> render(Long appId) {
Application app = applicationManageService.getById(appId);
- Utils.notNull(app, "Application doesn't exist");
+ Utils.requireNotNull(app, "Application doesn't exist");
List<ExternalLink> externalLink = this.list();
if (externalLink != null && externalLink.size() > 0) {
// Render the placeholder
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 54c03d5c8..1ba178188 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -173,11 +173,11 @@ public class FlinkSqlServiceImpl extends
ServiceImpl<FlinkSqlMapper, FlinkSql>
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor =
Exception.class)
public void rollback(Application application) {
FlinkSql sql = getCandidate(application.getId(),
CandidateTypeEnum.HISTORY);
- Utils.notNull(sql);
+ Utils.requireNotNull(sql);
try {
// check and backup current job
FlinkSql effectiveSql = getEffective(application.getId(), false);
- Utils.notNull(effectiveSql);
+ Utils.requireNotNull(effectiveSql);
// rollback history sql
backUpService.rollbackFlinkSql(application, sql);
} catch (Exception e) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index b07f6598d..55ca1dc53 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -116,7 +116,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Override
public boolean update(Project projectParam) {
Project project = getById(projectParam.getId());
- Utils.notNull(project);
+ Utils.requireNotNull(project);
ApiAlertException.throwIfFalse(
project.getTeamId().equals(projectParam.getTeamId()),
"TeamId can't be changed, update project failed.");
@@ -159,7 +159,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Override
public boolean removeById(Long id) {
Project project = getById(id);
- Utils.notNull(project);
+ Utils.requireNotNull(project);
LambdaQueryWrapper<Application> queryWrapper =
new LambdaQueryWrapper<Application>().eq(Application::getProjectId,
id);
long count = applicationManageService.count(queryWrapper);
@@ -231,7 +231,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Override
public List<String> listModules(Long id) {
Project project = getById(id);
- Utils.notNull(project);
+ Utils.requireNotNull(project);
if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState())
|| !project.getDistHome().exists()) {
@@ -297,7 +297,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
}
List<Map<String, Object>> confList = new ArrayList<>();
File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
- Utils.notNull(files);
+ Utils.requireNotNull(files);
for (File item : files) {
eachFile(item, confList, true);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 55e9a6fca..59f894889 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -292,7 +292,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
Map<String, Object> properties = new HashMap<>();
if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
- Utils.notNull(
+ Utils.requireNotNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong
or the cluster has been deleted. Please contact the Admin.",
@@ -310,7 +310,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION ==
application.getFlinkExecutionMode()) {
- Utils.notNull(
+ Utils.requireNotNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -378,7 +378,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
// At the remote mode, request the flink webui interface to get the
savepoint path
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- Utils.notNull(
+ Utils.requireNotNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
@@ -443,8 +443,8 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
private void expire(SavePoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application =
applicationManageService.getById(entity.getAppId());
- Utils.notNull(flinkEnv);
- Utils.notNull(application);
+ Utils.requireNotNull(flinkEnv);
+ Utils.requireNotNull(application);
int cpThreshold =
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index b98bdd7f8..0b70f9d6d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -184,7 +184,7 @@ public class SqlCompleteServiceImpl implements
SqlCompleteService {
nowStep = nowStep.get(nowChar).getNext();
loc += 1;
}
- Utils.notNull(preNode);
+ Utils.requireNotNull(preNode);
preNode.setStop();
preNode.setCount(count);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index 9cecfb4df..65de1047b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -72,8 +72,9 @@ public class YarnQueueServiceImpl extends
ServiceImpl<YarnQueueMapper, YarnQueue
@Override
public IPage<YarnQueue> getPage(YarnQueue yarnQueue, RestRequest request) {
- Utils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
- Utils.notNull(yarnQueue.getTeamId(), "Team id of yarn queue query params
mustn't be null.");
+ Utils.requireNotNull(yarnQueue, "Yarn queue query params mustn't be
null.");
+ Utils.requireNotNull(
+ yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be
null.");
Page<YarnQueue> page = new Page<>();
page.setCurrent(request.getPageNum());
page.setSize(request.getPageSize());
@@ -88,8 +89,8 @@ public class YarnQueueServiceImpl extends
ServiceImpl<YarnQueueMapper, YarnQueue
@Override
public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) {
- Utils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
- Utils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
+ Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be empty.");
+ Utils.requireNotNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
ResponseResult<String> responseResult = new ResponseResult<>();
@@ -206,8 +207,8 @@ public class YarnQueueServiceImpl extends
ServiceImpl<YarnQueueMapper, YarnQueue
@VisibleForTesting
public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) {
- Utils.notNull(yarnQueue, "Yarn queue mustn't be null.");
- Utils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
+ Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be null.");
+ Utils.requireNotNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
YarnQueue queueFromDB = getById(yarnQueue.getId());
ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist.");
return queueFromDB;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 78e70d275..18380e7ab 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -95,7 +95,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper,
User> implements Us
page.setSize(request.getPageSize());
IPage<User> resPage = this.baseMapper.selectPage(page, user);
- Utils.notNull(resPage);
+ Utils.requireNotNull(resPage);
if (resPage.getTotal() == 0) {
resPage.setRecords(Collections.emptyList());
}
@@ -197,7 +197,7 @@ public class UserServiceImpl extends
ServiceImpl<UserMapper, User> implements Us
@Override
public void setLastTeam(Long teamId, Long userId) {
User user = getById(userId);
- Utils.notNull(user);
+ Utils.requireNotNull(user);
user.setLastTeamId(teamId);
this.baseMapper.updateById(user);
}
@@ -205,7 +205,7 @@ public class UserServiceImpl extends
ServiceImpl<UserMapper, User> implements Us
@Override
public void clearLastTeam(Long userId, Long teamId) {
User user = getById(userId);
- Utils.notNull(user);
+ Utils.requireNotNull(user);
if (!teamId.equals(user.getLastTeamId())) {
return;
}
diff --git
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index 5b253722a..f1a8b47e8 100644
---
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -98,9 +98,9 @@ public class FlinkStandaloneSessionCluster implements
Startable {
@Override
public void start() {
- Utils.notNull(jobManagerContainer);
+ Utils.requireNotNull(jobManagerContainer);
jobManagerContainer.start();
- Utils.notNull(taskManagerContainers);
+ Utils.requireNotNull(taskManagerContainers);
for (FlinkContainer taskManagerContainer : taskManagerContainers) {
taskManagerContainer.start();
}
@@ -108,11 +108,11 @@ public class FlinkStandaloneSessionCluster implements
Startable {
@Override
public void stop() {
- Utils.notNull(taskManagerContainers);
+ Utils.requireNotNull(taskManagerContainers);
for (FlinkContainer taskManagerContainer : taskManagerContainers) {
taskManagerContainer.stop();
}
- Utils.notNull(jobManagerContainer);
+ Utils.requireNotNull(jobManagerContainer);
jobManagerContainer.stop();
}