This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3fc85bc0d [ISSUE-3057][Improve] Improve streampark-common module base 
on [3.10 Pre-Conditions Checking] (#3420)
3fc85bc0d is described below

commit 3fc85bc0dd5b84db882e9af6d0fd9e4171a0336c
Author: zhengkezhou <[email protected]>
AuthorDate: Fri Dec 22 22:57:19 2023 +0800

    [ISSUE-3057][Improve] Improve streampark-common module base on [3.10 
Pre-Conditions Checking] (#3420)
    
    * [Improve] rename notNull method in Utils.scala
---
 .../scala/org/apache/streampark/common/util/Logger.scala    |  2 +-
 .../scala/org/apache/streampark/common/util/Utils.scala     |  6 +++---
 .../console/core/controller/ExternalLinkController.java     |  2 +-
 .../streampark/console/core/runner/EnvInitializer.java      |  2 +-
 .../application/impl/ApplicationActionServiceImpl.java      |  6 +++---
 .../console/core/service/impl/AppBuildPipeServiceImpl.java  |  2 +-
 .../console/core/service/impl/ExternalLinkServiceImpl.java  |  2 +-
 .../console/core/service/impl/FlinkSqlServiceImpl.java      |  4 ++--
 .../console/core/service/impl/ProjectServiceImpl.java       |  8 ++++----
 .../console/core/service/impl/SavePointServiceImpl.java     | 10 +++++-----
 .../console/core/service/impl/SqlCompleteServiceImpl.java   |  2 +-
 .../console/core/service/impl/YarnQueueServiceImpl.java     | 13 +++++++------
 .../console/system/service/impl/UserServiceImpl.java        |  6 +++---
 .../testcontainer/flink/FlinkStandaloneSessionCluster.java  |  8 ++++----
 14 files changed, 37 insertions(+), 36 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index f17465e6b..faf1fc97c 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -134,7 +134,7 @@ private[this] object LoggerFactory extends 
LoggerFactoryBinder {
     val shadedPackage = "org.apache.streampark.shaded"
 
     override def configureByResource(url: URL): Unit = {
-      Utils.notNull(url, "URL argument cannot be null")
+      Utils.requireNotNull(url, "URL argument cannot be null")
       val path = url.getPath
       if (path.endsWith("xml")) {
         val configurator = new JoranConfigurator()
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 30d001ae3..f8e3a21e6 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -33,14 +33,14 @@ object Utils extends Logger {
 
   private[this] lazy val OS = System.getProperty("os.name").toLowerCase
 
-  def notNull(obj: Any, message: String): Unit = {
+  def requireNotNull(obj: Any, message: String): Unit = {
     if (obj == null) {
       throw new NullPointerException(message)
     }
   }
 
-  def notNull(obj: Any): Unit = {
-    notNull(obj, "this argument must not be null")
+  def requireNotNull(obj: Any): Unit = {
+    requireNotNull(obj, "this argument must not be null")
   }
 
   def requireNotEmpty(elem: Any): Boolean = {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
index a1c226077..5af4fc068 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
@@ -82,7 +82,7 @@ public class ExternalLinkController {
   @PostMapping("/update")
   @RequiresPermissions("externalLink:update")
   public RestResponse update(@Valid ExternalLink externalLink) {
-    Utils.notNull(externalLink.getId(), "The link id cannot be null");
+    Utils.requireNotNull(externalLink.getId(), "The link id cannot be null");
     externalLinkService.update(externalLink);
     return RestResponse.success();
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 294e7952b..5026ee336 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -113,7 +113,7 @@ public class EnvInitializer implements ApplicationRunner {
         .forEach(
             key -> {
               InternalOption config = InternalConfigHolder.getConfig(key);
-              Utils.notNull(config);
+              Utils.requireNotNull(config);
               InternalConfigHolder.set(config, springEnv.getProperty(key, 
config.classType()));
             });
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 401eb67d4..8f22ca5f4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -385,7 +385,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
   public void start(Application appParam, boolean auto) throws Exception {
     // 1) check application
     final Application application = getById(appParam.getId());
-    Utils.notNull(application);
+    Utils.requireNotNull(application);
     ApiAlertException.throwIfTrue(
         !application.isCanBeStart(), "[StreamPark] The application cannot be 
started repeatedly.");
 
@@ -397,7 +397,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     }
 
     AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
-    Utils.notNull(buildPipeline);
+    Utils.requireNotNull(buildPipeline);
 
     FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
 
@@ -625,7 +625,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     switch (application.getDevelopmentMode()) {
       case FLINK_SQL:
         FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
false);
-        Utils.notNull(flinkSql);
+        Utils.requireNotNull(flinkSql);
         // 1) dist_userJar
         String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
         // 2) appConfig
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index db827a626..36111230e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -206,7 +206,7 @@ public class AppBuildPipeServiceImpl
     FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), 
false);
     if (app.isFlinkSqlJobOrPyFlinkJob()) {
       FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : 
newFlinkSql;
-      Utils.notNull(flinkSql);
+      Utils.requireNotNull(flinkSql);
       app.setDependency(flinkSql.getDependency());
       app.setTeamResource(flinkSql.getTeamResource());
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 227dab28c..1cc365c0a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -76,7 +76,7 @@ public class ExternalLinkServiceImpl extends 
ServiceImpl<ExternalLinkMapper, Ext
   @Override
   public List<ExternalLink> render(Long appId) {
     Application app = applicationManageService.getById(appId);
-    Utils.notNull(app, "Application doesn't exist");
+    Utils.requireNotNull(app, "Application doesn't exist");
     List<ExternalLink> externalLink = this.list();
     if (externalLink != null && externalLink.size() > 0) {
       // Render the placeholder
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 54c03d5c8..1ba178188 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -173,11 +173,11 @@ public class FlinkSqlServiceImpl extends 
ServiceImpl<FlinkSqlMapper, FlinkSql>
   @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = 
Exception.class)
   public void rollback(Application application) {
     FlinkSql sql = getCandidate(application.getId(), 
CandidateTypeEnum.HISTORY);
-    Utils.notNull(sql);
+    Utils.requireNotNull(sql);
     try {
       // check and backup current job
       FlinkSql effectiveSql = getEffective(application.getId(), false);
-      Utils.notNull(effectiveSql);
+      Utils.requireNotNull(effectiveSql);
       // rollback history sql
       backUpService.rollbackFlinkSql(application, sql);
     } catch (Exception e) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index b07f6598d..55ca1dc53 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -116,7 +116,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
   @Override
   public boolean update(Project projectParam) {
     Project project = getById(projectParam.getId());
-    Utils.notNull(project);
+    Utils.requireNotNull(project);
     ApiAlertException.throwIfFalse(
         project.getTeamId().equals(projectParam.getTeamId()),
         "TeamId can't be changed, update project failed.");
@@ -159,7 +159,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
   @Override
   public boolean removeById(Long id) {
     Project project = getById(id);
-    Utils.notNull(project);
+    Utils.requireNotNull(project);
     LambdaQueryWrapper<Application> queryWrapper =
         new LambdaQueryWrapper<Application>().eq(Application::getProjectId, 
id);
     long count = applicationManageService.count(queryWrapper);
@@ -231,7 +231,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
   @Override
   public List<String> listModules(Long id) {
     Project project = getById(id);
-    Utils.notNull(project);
+    Utils.requireNotNull(project);
 
     if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState())
         || !project.getDistHome().exists()) {
@@ -297,7 +297,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
       }
       List<Map<String, Object>> confList = new ArrayList<>();
       File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
-      Utils.notNull(files);
+      Utils.requireNotNull(files);
       for (File item : files) {
         eachFile(item, confList, true);
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 55e9a6fca..59f894889 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -292,7 +292,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     Map<String, Object> properties = new HashMap<>();
 
     if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
-      Utils.notNull(
+      Utils.requireNotNull(
           cluster,
           String.format(
               "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or the cluster has been deleted. Please contact the Admin.",
@@ -310,7 +310,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     }
     if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
       if (FlinkExecutionMode.YARN_SESSION == 
application.getFlinkExecutionMode()) {
-        Utils.notNull(
+        Utils.requireNotNull(
             cluster,
             String.format(
                 "The yarn session clusterId=%s cannot be find, maybe the 
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -378,7 +378,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
     // At the remote mode, request the flink webui interface to get the 
savepoint path
     FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-    Utils.notNull(
+    Utils.requireNotNull(
         cluster,
         String.format(
             "The clusterId=%s cannot be find, maybe the clusterId is wrong or "
@@ -443,8 +443,8 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
   private void expire(SavePoint entity) {
     FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
     Application application = 
applicationManageService.getById(entity.getAppId());
-    Utils.notNull(flinkEnv);
-    Utils.notNull(application);
+    Utils.requireNotNull(flinkEnv);
+    Utils.requireNotNull(application);
 
     int cpThreshold =
         
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index b98bdd7f8..0b70f9d6d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -184,7 +184,7 @@ public class SqlCompleteServiceImpl implements 
SqlCompleteService {
         nowStep = nowStep.get(nowChar).getNext();
         loc += 1;
       }
-      Utils.notNull(preNode);
+      Utils.requireNotNull(preNode);
       preNode.setStop();
       preNode.setCount(count);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index 9cecfb4df..65de1047b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -72,8 +72,9 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
 
   @Override
   public IPage<YarnQueue> getPage(YarnQueue yarnQueue, RestRequest request) {
-    Utils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
-    Utils.notNull(yarnQueue.getTeamId(), "Team id of yarn queue query params 
mustn't be null.");
+    Utils.requireNotNull(yarnQueue, "Yarn queue query params mustn't be 
null.");
+    Utils.requireNotNull(
+        yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be 
null.");
     Page<YarnQueue> page = new Page<>();
     page.setCurrent(request.getPageNum());
     page.setSize(request.getPageSize());
@@ -88,8 +89,8 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
   @Override
   public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) {
 
-    Utils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
-    Utils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
+    Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be empty.");
+    Utils.requireNotNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
 
     ResponseResult<String> responseResult = new ResponseResult<>();
 
@@ -206,8 +207,8 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
 
   @VisibleForTesting
   public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) {
-    Utils.notNull(yarnQueue, "Yarn queue mustn't be null.");
-    Utils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
+    Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be null.");
+    Utils.requireNotNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
     YarnQueue queueFromDB = getById(yarnQueue.getId());
     ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist.");
     return queueFromDB;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 78e70d275..18380e7ab 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -95,7 +95,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, 
User> implements Us
     page.setSize(request.getPageSize());
     IPage<User> resPage = this.baseMapper.selectPage(page, user);
 
-    Utils.notNull(resPage);
+    Utils.requireNotNull(resPage);
     if (resPage.getTotal() == 0) {
       resPage.setRecords(Collections.emptyList());
     }
@@ -197,7 +197,7 @@ public class UserServiceImpl extends 
ServiceImpl<UserMapper, User> implements Us
   @Override
   public void setLastTeam(Long teamId, Long userId) {
     User user = getById(userId);
-    Utils.notNull(user);
+    Utils.requireNotNull(user);
     user.setLastTeamId(teamId);
     this.baseMapper.updateById(user);
   }
@@ -205,7 +205,7 @@ public class UserServiceImpl extends 
ServiceImpl<UserMapper, User> implements Us
   @Override
   public void clearLastTeam(Long userId, Long teamId) {
     User user = getById(userId);
-    Utils.notNull(user);
+    Utils.requireNotNull(user);
     if (!teamId.equals(user.getLastTeamId())) {
       return;
     }
diff --git 
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
 
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index 5b253722a..f1a8b47e8 100644
--- 
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++ 
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -98,9 +98,9 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
 
   @Override
   public void start() {
-    Utils.notNull(jobManagerContainer);
+    Utils.requireNotNull(jobManagerContainer);
     jobManagerContainer.start();
-    Utils.notNull(taskManagerContainers);
+    Utils.requireNotNull(taskManagerContainers);
     for (FlinkContainer taskManagerContainer : taskManagerContainers) {
       taskManagerContainer.start();
     }
@@ -108,11 +108,11 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
 
   @Override
   public void stop() {
-    Utils.notNull(taskManagerContainers);
+    Utils.requireNotNull(taskManagerContainers);
     for (FlinkContainer taskManagerContainer : taskManagerContainers) {
       taskManagerContainer.stop();
     }
-    Utils.notNull(jobManagerContainer);
+    Utils.requireNotNull(jobManagerContainer);
     jobManagerContainer.stop();
   }
 

Reply via email to