This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 9c80377c88416c14e62c47231dda6d96167a5956
Author: benjobs <[email protected]>
AuthorDate: Fri Jan 24 20:24:49 2025 +0800

    [Improve] add AssertUtils
---
 .../apache/streampark/common/fs/LfsOperator.scala  |   6 +-
 .../streampark/common/util/AssertUtils.scala       | 300 +++++++++++++++++++++
 .../apache/streampark/common/util/FileUtils.scala  |   4 +-
 .../org/apache/streampark/common/util/Logger.scala |   2 +-
 .../streampark/common/util/PropertiesUtils.scala   |   2 +-
 .../org/apache/streampark/common/util/Utils.scala  |  41 +--
 .../streampark/console/base/util/CommonUtils.java  |   8 +-
 .../core/controller/ExternalLinkController.java    |   4 +-
 .../streampark/console/core/entity/Project.java    |   3 +-
 .../console/core/runner/EnvInitializer.java        |   8 +-
 .../core/service/alert/impl/AlertServiceImpl.java  |   4 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |   3 +-
 .../service/impl/ApplicationConfigServiceImpl.java |   4 +-
 .../core/service/impl/ApplicationServiceImpl.java  |   7 +-
 .../core/service/impl/ExternalLinkServiceImpl.java |   8 +-
 .../core/service/impl/FlinkSqlServiceImpl.java     |   5 +-
 .../core/service/impl/ProjectServiceImpl.java      |  10 +-
 .../core/service/impl/SavepointServiceImpl.java    |  11 +-
 .../core/service/impl/SqlCompleteServiceImpl.java  |   4 +-
 .../core/service/impl/YarnQueueServiceImpl.java    |  15 +-
 .../system/service/impl/MemberServiceImpl.java     |   8 +-
 .../system/service/impl/UserServiceImpl.java       |   8 +-
 .../connector/hbase/source/HBaseJavaSource.java    |   6 +-
 .../flink/connector/jdbc/sink/JdbcJavaSink.java    |   4 +-
 .../connector/jdbc/source/JdbcJavaSource.java      |   6 +-
 .../flink/connector/jdbc/source/JdbcSource.scala   |  16 +-
 .../connector/mongo/source/MongoJavaSource.java    |   8 +-
 .../flink/packer/docker/DockerRetriever.scala      |   4 +-
 28 files changed, 392 insertions(+), 117 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
index ca6d50b25..313b06963 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.common.fs
 
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils.{isAnyBank, notEmpty}
+import org.apache.streampark.common.util.{AssertUtils, Logger}
+import org.apache.streampark.common.util.Utils.isAnyBank
 
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.commons.io.{FileUtils, IOUtils}
@@ -41,7 +41,7 @@ object LfsOperator extends FsOperator with Logger {
   }
 
   override def delete(path: String): Unit = {
-    if (notEmpty(path)) {
+    if (AssertUtils.isNotEmpty(path)) {
       val file = new File(path)
       if (file.exists()) {
         FileUtils.forceDelete(file)
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
new file mode 100644
index 000000000..5d0eb5042
--- /dev/null
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import javax.annotation.Nullable
+
+import java.util
+import java.util.{Collection => JavaCollection, Map => JavaMap}
+
+import scala.collection.JavaConversions._
+
+/** @since 2.1.6 */
+object AssertUtils {
+
+  /**
+   * Checks the given boolean condition, and throws an {@code 
IllegalArgumentException} if the
+   * condition is not met (evaluates to {@code false}).
+   *
+   * @param condition
+   *   The condition to check
+   * @throws IllegalArgumentException
+   *   Thrown, if the condition is violated.
+   */
+  def required(condition: Boolean): Unit = {
+    if (!condition) {
+      throw new IllegalArgumentException
+    }
+  }
+
+  /**
+   * Checks the given boolean condition, and throws an {@code 
IllegalArgumentException} if the
+   * condition is not met (evaluates to {@code false}). The exception will 
have the given error
+   * message.
+   *
+   * @param condition
+   *   The condition to check
+   * @param message
+   *   The message for the {@code IllegalArgumentException} that is thrown if 
the check fails.
+   * @throws IllegalArgumentException
+   *   Thrown, if the condition is violated.
+   */
+  def required(condition: Boolean, @Nullable message: String): Unit = {
+    if (!condition) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Checks the given boolean condition, and throws an {@code 
IllegalStateException} if the
+   * condition is not met (evaluates to {@code false}).
+   *
+   * @param condition
+   *   The condition to check
+   * @throws IllegalStateException
+   *   Thrown, if the condition is violated.
+   */
+  def state(condition: Boolean): Unit = {
+    if (!condition) {
+      throw new IllegalStateException
+    }
+  }
+
+  /**
+   * Checks the given boolean condition, and throws an IllegalStateException 
if the condition is not
+   * met (evaluates to {@code false}). The exception will have the given error 
message.
+   *
+   * @param condition
+   *   The condition to check
+   * @param message
+   *   The message for the IllegalStateException that is thrown if the check 
fails.
+   * @throws IllegalStateException
+   *   Thrown, if the condition is violated.
+   */
+  def state(condition: Boolean, @Nullable message: String): Unit = {
+    if (!condition) {
+      throw new IllegalStateException(message)
+    }
+  }
+
+  // ------------------------------------------------------------------------
+  //  Null checks
+  // ------------------------------------------------------------------------
+  /** Ensures that the given object reference is not null. Upon violation, a */
+  def notNull[T](@Nullable reference: T): T = {
+    if (reference == null) {
+      throw new NullPointerException
+    }
+    reference
+  }
+
+  /**
+   * Ensures that the given object reference is not null. Upon violation, a 
NullPointerException
+   * that is thrown if the check fails.
+   *
+   * @return
+   *   The object reference itself (generically typed).
+   * @throws NullPointerException
+   *   Thrown, if the passed reference was null.
+   */
+  def notNull[T](@Nullable reference: T, @Nullable message: String): T = {
+    if (reference == null) {
+      throw new NullPointerException(message)
+    }
+    reference
+  }
+
+  def isEmpty(reference: AnyRef): Boolean = !isNotEmpty(reference)
+
+  def isNotEmpty(elem: AnyRef): Boolean = {
+    elem match {
+      case null => false
+      case x if x.isInstanceOf[Array[_]] => 
elem.asInstanceOf[Array[_]].nonEmpty
+      case x if x.isInstanceOf[CharSequence] => elem.toString.trim.nonEmpty
+      case x if x.isInstanceOf[Traversable[_]] => 
x.asInstanceOf[Traversable[_]].nonEmpty
+      case x if x.isInstanceOf[Iterable[_]] => 
x.asInstanceOf[Iterable[_]].nonEmpty
+      case x if x.isInstanceOf[JavaCollection[_]] => 
!x.asInstanceOf[JavaCollection[_]].isEmpty
+      case x if x.isInstanceOf[JavaMap[_, _]] => !x.asInstanceOf[JavaMap[_, 
_]].isEmpty
+      case _ => true
+    }
+  }
+
+  /**
+   * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not 
be {@code null} and
+   * must contain at least one element. <pre 
class="code">AssertUtils.notEmpty(array, "must be
+   * contain elements");</pre>
+   *
+   * @param reference
+   *   the object to check
+   * @throws IllegalArgumentException
+   *   if the object array is {@code null} or contains no elements
+   */
+  def notEmpty(reference: AnyRef): Unit = {
+    if (isEmpty(reference)) {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  /**
+   * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not 
be {@code null} and
+   * must contain at least one element. <pre class="code"> 
AssertUtils.notEmpty(array, "must be
+   * contain elements");</pre>
+   *
+   * @param reference
+   *   the object to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the object array is {@code null} or contains no elements
+   */
+  def notEmpty(@Nullable reference: AnyRef, message: String): Unit = {
+    if (isEmpty(reference)) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Assert that an array contains no {@code null} elements. <p>Note: Does not 
complain if the array
+   * is empty! <pre class="code">AssertUtils.noNullElements(array, "The array 
must contain non-null
+   * elements");</pre>
+   *
+   * @param array
+   *   the array to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the object array contains a {@code null} element
+   */
+  def noNullElements(@Nullable array: Array[AnyRef], message: String): Unit = {
+    if (array != null) for (element <- array) {
+      if (element == null) throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Assert that a collection contains no {@code null} elements. <p>Note: Does 
not complain if the
+   * collection is empty! <pre 
class="code">AssertUtils.noNullElements(collection, "Collection must
+   * contain non-null elements");</pre>
+   *
+   * @param collection
+   *   the collection to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the collection contains a {@code null} element
+   */
+  def noNullElements(@Nullable collection: util.Collection[_], message: 
String): Unit = {
+    if (collection != null) for (element <- collection) {
+      if (element == null) {
+        throw new IllegalArgumentException(message)
+      }
+    }
+  }
+
+  /**
+   * Assert that the given String is not empty; that is, it must not be {@code 
null} and not the
+   * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must 
not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @throws IllegalArgumentException
+   *   if the text is empty
+   * @see
+   *   StringUtils#hasLength
+   */
+  def hasLength(@Nullable text: String): Unit = {
+    if (!getHasLength(text)) {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  /**
+   * Assert that the given String is not empty; that is, it must not be {@code 
null} and not the
+   * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must 
not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the text is empty
+   * @see
+   *   StringUtils#hasLength
+   */
+  def hasLength(@Nullable text: String, message: String): Unit = {
+    if (!getHasLength(text)) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Assert that the given String contains valid text content; that is, it 
must not be {@code null}
+   * and must contain at least one non-whitespace character. <pre
+   * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @throws IllegalArgumentException
+   *   if the text does not contain valid text content
+   * @see
+   *   StringUtils#hasText
+   */
+  def hasText(@Nullable text: String): Unit = {
+    if (!getHasText(text)) {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  /**
+   * Assert that the given String contains valid text content; that is, it 
must not be {@code null}
+   * and must contain at least one non-whitespace character. <pre
+   * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the text does not contain valid text content
+   * @see
+   *   StringUtils#hasText
+   */
+  def hasText(@Nullable text: String, message: String): Unit = {
+    if (!getHasText(text)) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  private[this] def getHasLength(@Nullable str: String): Boolean =
+    str != null && str.nonEmpty
+
+  private[this] def getHasText(@Nullable str: String): Boolean = {
+    str != null && str.nonEmpty && containsText(str)
+  }
+
+  private[this] def containsText(str: CharSequence): Boolean = {
+    val strLen = str.length
+    for (i <- 0 until strLen) {
+      if (!Character.isWhitespace(str.charAt(i))) {
+        return true
+      }
+    }
+    false
+  }
+
+}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index d3eed9d2f..3090a72a7 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -92,8 +92,8 @@ object FileUtils {
 
   def getPathFromEnv(env: String): String = {
     val path = System.getenv(env)
-    require(
-      Utils.notEmpty(path),
+    AssertUtils.required(
+      AssertUtils.isNotEmpty(path),
       s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env")
     val file = new File(path)
     require(file.exists(), s"[StreamPark] FileUtils.getPathFromEnv: $env is 
not exist!")
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index 3ce4fd9a7..1c6daa983 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -134,7 +134,7 @@ private[this] object LoggerFactory extends 
LoggerFactoryBinder {
     private val shadedPackage = "org.apache.streampark.shaded"
 
     override def configureByResource(url: URL): Unit = {
-      Utils.notNull(url, "URL argument cannot be null")
+      AssertUtils.notNull(url, "URL argument cannot be null")
       val path = url.getPath
       if (path.endsWith("xml")) {
         val configurator = new JoranConfigurator()
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 6adc03cf7..f839fa207 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -280,7 +280,7 @@ object PropertiesUtils extends Logger {
       val map = mutable.Map[String, String]()
       val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
       simple.split("\\s?-D") match {
-        case d if Utils.notEmpty(d) =>
+        case d if AssertUtils.isNotEmpty(d) =>
           d.foreach(
             x => {
               if (x.nonEmpty) {
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 2015a2c80..80a19c2af 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -18,11 +18,11 @@ package org.apache.streampark.common.util
 
 import org.apache.commons.lang3.StringUtils
 
-import java.io.{BufferedInputStream, File, FileInputStream, IOException, 
PrintWriter, StringWriter}
+import java.io._
 import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, 
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
 import java.net.URL
 import java.time.Duration
-import java.util.{jar, Collection => JavaCollection, Map => JavaMap, 
Properties, UUID}
+import java.util.{jar, Properties, UUID}
 import java.util.concurrent.locks.LockSupport
 import java.util.jar.{JarFile, JarInputStream}
 
@@ -34,43 +34,6 @@ object Utils extends Logger {
 
   private[this] lazy val OS = System.getProperty("os.name").toLowerCase
 
-  def notNull(obj: Any, message: String): Unit = {
-    if (obj == null) {
-      throw new NullPointerException(message)
-    }
-  }
-
-  def notNull(obj: Any): Unit = {
-    notNull(obj, "this argument must not be null")
-  }
-
-  def notEmpty(elem: Any): Boolean = {
-    elem match {
-      case null => false
-      case x if x.isInstanceOf[Array[_]] => 
elem.asInstanceOf[Array[_]].nonEmpty
-      case x if x.isInstanceOf[CharSequence] => elem.toString.trim.nonEmpty
-      case x if x.isInstanceOf[Traversable[_]] => 
x.asInstanceOf[Traversable[_]].nonEmpty
-      case x if x.isInstanceOf[Iterable[_]] => 
x.asInstanceOf[Iterable[_]].nonEmpty
-      case x if x.isInstanceOf[JavaCollection[_]] => 
!x.asInstanceOf[JavaCollection[_]].isEmpty
-      case x if x.isInstanceOf[JavaMap[_, _]] => !x.asInstanceOf[JavaMap[_, 
_]].isEmpty
-      case _ => true
-    }
-  }
-
-  def isEmpty(elem: Any): Boolean = !notEmpty(elem)
-
-  def required(expression: Boolean): Unit = {
-    if (!expression) {
-      throw new IllegalArgumentException
-    }
-  }
-
-  def required(expression: Boolean, errorMessage: Any): Unit = {
-    if (!expression) {
-      throw new IllegalArgumentException(s"requirement failed: 
${errorMessage.toString}")
-    }
-  }
-
   def uuid(): String = UUID.randomUUID().toString.replaceAll("-", "")
 
   @throws[IOException]
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 524959322..47096a0ff 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.base.util;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.cglib.beans.BeanMap;
@@ -451,7 +451,7 @@ public final class CommonUtils implements Serializable {
   }
 
   public static <T> T[] arrayRemoveElements(T[] array, T... elem) {
-    Utils.notNull(array);
+    AssertUtils.notNull(array);
     List<T> arrayList = new ArrayList<>(0);
     Collections.addAll(arrayList, array);
     if (isEmpty(elem)) {
@@ -464,7 +464,7 @@ public final class CommonUtils implements Serializable {
   }
 
   public static <T> T[] arrayRemoveIndex(T[] array, int... index) {
-    Utils.notNull(array);
+    AssertUtils.notNull(array);
     for (int j : index) {
       if (j < 0 || j > array.length - 1) {
         throw new IndexOutOfBoundsException("index error.@" + j);
@@ -481,7 +481,7 @@ public final class CommonUtils implements Serializable {
   }
 
   public static <T> T[] arrayInsertIndex(T[] array, int index, T t) {
-    Utils.notNull(array);
+    AssertUtils.notNull(array);
     List<T> arrayList = new ArrayList<T>(array.length + 1);
     if (index == 0) {
       arrayList.add(t);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
index 66cdb0bee..3818ec45f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.controller;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.entity.ExternalLink;
 import org.apache.streampark.console.core.service.ExternalLinkService;
@@ -70,7 +70,7 @@ public class ExternalLinkController {
   @PostMapping("/update")
   @RequiresPermissions("externalLink:update")
   public RestResponse update(@Valid ExternalLink externalLink) {
-    Utils.notNull(externalLink.getId(), "The link id cannot be null");
+    AssertUtils.notNull(externalLink.getId(), "The link id cannot be null");
     externalLinkService.update(externalLink);
     return RestResponse.success();
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index dc0f23c63..1694d9bdb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.WebUtils;
@@ -228,7 +229,7 @@ public class Project implements Serializable {
       try {
         Process process = Runtime.getRuntime().exec(mvn + " --version");
         process.waitFor();
-        Utils.required(process.exitValue() == 0);
+        AssertUtils.required(process.exitValue() == 0);
         useWrapper = false;
       } catch (Exception ignored) {
         log.warn("try using user-installed maven failed, now use 
maven-wrapper.");
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index d6359340b..aabde27cf 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -24,8 +24,8 @@ import org.apache.streampark.common.conf.InternalOption;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.SystemPropertyUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.MavenConfig;
 import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -104,7 +104,7 @@ public class EnvInitializer implements ApplicationRunner {
         .forEach(
             key -> {
               InternalOption config = InternalConfigHolder.getConfig(key);
-              Utils.notNull(config);
+              AssertUtils.notNull(config);
               InternalConfigHolder.set(config, springEnv.getProperty(key, 
config.classType()));
             });
 
@@ -172,7 +172,7 @@ public class EnvInitializer implements ApplicationRunner {
     // 2. upload jar.
     // 2.1) upload client jar
     File client = WebUtils.getAppClientDir();
-    Utils.required(
+    AssertUtils.required(
         client.exists() && client.listFiles().length > 0,
         client.getAbsolutePath().concat(" is not exists or empty directory "));
 
@@ -189,7 +189,7 @@ public class EnvInitializer implements ApplicationRunner {
         WebUtils.getAppLibDir()
             .listFiles(pathname -> 
pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));
 
-    Utils.required(shims != null && shims.length > 0, "streampark-flink-shims 
jar not exist");
+    AssertUtils.required(shims != null && shims.length > 0, 
"streampark-flink-shims jar not exist");
 
     String appShims = workspace.APP_SHIMS();
     fsOperator.delete(appShims);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index b08b0d2cf..acb9b255b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.console.core.service.alert.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.AlertException;
 import org.apache.streampark.console.base.util.SpringContextUtils;
 import org.apache.streampark.console.core.bean.AlertConfigWithParams;
@@ -100,7 +100,7 @@ public class AlertServiceImpl implements AlertService {
                   try {
                     Class<? extends AlertNotifyService> notifyServiceClass =
                         getAlertServiceImpl(alertType);
-                    Utils.notNull(notifyServiceClass);
+                    AssertUtils.notNull(notifyServiceClass);
                     boolean alertRes =
                         SpringContextUtils.getBean(notifyServiceClass)
                             .doAlert(params, alertTemplate);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1d4edd7cd..ad9b350fe 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
@@ -169,7 +170,7 @@ public class AppBuildPipeServiceImpl
     FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), 
false);
     if (app.isFlinkSqlJob()) {
       FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : 
newFlinkSql;
-      Utils.notNull(flinkSql);
+      AssertUtils.notNull(flinkSql);
       app.setDependency(flinkSql.getDependency());
     }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 8f3b8d05b..a4e1a91b9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -107,7 +107,7 @@ public class ApplicationConfigServiceImpl
     if (application.isFlinkSqlJob()) {
       // get effect config
       ApplicationConfig effectiveConfig = getEffective(application.getId());
-      if (Utils.isEmpty(application.getConfig())) {
+      if (AssertUtils.isEmpty(application.getConfig())) {
         if (effectiveConfig != null) {
           effectiveService.delete(application.getId(), EffectiveType.CONFIG);
         }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 6ed4df915..3d6823174 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
@@ -1510,7 +1511,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @Transactional(rollbackFor = {Exception.class})
   public void start(Application appParam, boolean auto) throws Exception {
     final Application application = getById(appParam.getId());
-    Utils.notNull(application);
+    AssertUtils.notNull(application);
     if (!application.isCanBeStart()) {
       throw new ApiAlertException("[StreamPark] The application cannot be 
started repeatedly.");
     }
@@ -1605,7 +1606,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
     } else if (application.isFlinkSqlJob()) {
       FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
false);
-      Utils.notNull(flinkSql);
+      AssertUtils.notNull(flinkSql);
       // 1) dist_userJar
       String sqlDistJar = serviceHelper.getSqlClientJar(flinkEnv);
       // 2) appConfig
@@ -1632,7 +1633,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     }
 
     AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
-    Utils.notNull(buildPipeline);
+    AssertUtils.notNull(buildPipeline);
 
     BuildResult buildResult = buildPipeline.getBuildResult();
     if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index b01aed743..ecfa29131 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ExternalLink;
 import org.apache.streampark.console.core.enums.PlaceholderType;
@@ -75,7 +75,7 @@ public class ExternalLinkServiceImpl extends 
ServiceImpl<ExternalLinkMapper, Ext
   @Override
   public List<ExternalLink> render(Long appId) {
     Application app = applicationService.getById(appId);
-    Utils.notNull(app, "Application doesn't exist");
+    AssertUtils.notNull(app, "Application doesn't exist");
     List<ExternalLink> externalLink = this.list();
     if (externalLink != null && !externalLink.isEmpty()) {
       // Render the placeholder
@@ -109,10 +109,10 @@ public class ExternalLinkServiceImpl extends 
ServiceImpl<ExternalLinkMapper, Ext
     if (result == null) {
       return true;
     }
-    Utils.required(
+    AssertUtils.required(
         !result.getBadgeName().equals(params.getBadgeName()),
         String.format("The name: %s is already existing.", 
result.getBadgeName()));
-    Utils.required(
+    AssertUtils.required(
         !result.getLinkUrl().equals(params.getLinkUrl()),
         String.format("The linkUrl: %s is already existing.", 
result.getLinkUrl()));
     return false;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index f03af2ae0..1633926aa 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -172,11 +173,11 @@ public class FlinkSqlServiceImpl extends 
ServiceImpl<FlinkSqlMapper, FlinkSql>
   @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = 
Exception.class)
   public void rollback(Application application) {
     FlinkSql sql = getCandidate(application.getId(), CandidateType.HISTORY);
-    Utils.notNull(sql);
+    AssertUtils.notNull(sql);
     try {
       // check and backup current job
       FlinkSql effectiveSql = getEffective(application.getId(), false);
-      Utils.notNull(effectiveSql);
+      AssertUtils.notNull(effectiveSql);
       // rollback history sql
       backUpService.rollbackFlinkSql(application, sql);
     } catch (Exception e) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 4101163eb..ba72d4331 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -20,9 +20,9 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
@@ -122,7 +122,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
   @Transactional(rollbackFor = {Exception.class})
   public boolean update(Project projectParam) {
     Project project = getById(projectParam.getId());
-    Utils.notNull(project);
+    AssertUtils.notNull(project);
     ApiAlertException.throwIfFalse(
         project.getTeamId().equals(projectParam.getTeamId()),
         "Team can't be changed, update project failed.");
@@ -166,7 +166,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
   @Transactional(rollbackFor = {Exception.class})
   public boolean delete(Long id) {
     Project project = getById(id);
-    Utils.notNull(project);
+    AssertUtils.notNull(project);
     LambdaQueryWrapper<Application> queryWrapper =
         new LambdaQueryWrapper<Application>().eq(Application::getProjectId, 
id);
     long count = applicationService.count(queryWrapper);
@@ -246,7 +246,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
   @Override
   public List<String> modules(Long id) {
     Project project = getById(id);
-    Utils.notNull(project);
+    AssertUtils.notNull(project);
     BuildState buildState = BuildState.of(project.getBuildState());
     if (BuildState.SUCCESSFUL.equals(buildState)) {
       File appHome = project.getDistHome();
@@ -373,7 +373,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
       }
       List<Map<String, Object>> list = new ArrayList<>();
       File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
-      Utils.notNull(files);
+      AssertUtils.notNull(files);
       for (File item : files) {
         eachFile(item, list, true);
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 9dae7dc84..449676fdb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -123,8 +124,8 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
   private void clearExpire(Savepoint entity) {
     FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
     Application application = applicationService.getById(entity.getAppId());
-    Utils.notNull(flinkEnv);
-    Utils.notNull(application);
+    AssertUtils.notNull(flinkEnv);
+    AssertUtils.notNull(application);
 
     String numRetainedKey = 
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key();
     String numRetainedFromDynamicProp =
@@ -277,7 +278,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
       // 3.1) At the remote mode, request the flink webui interface to get the 
savepoint path
       if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
         FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        Utils.notNull(
+        AssertUtils.notNull(
             cluster,
             String.format(
                 "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or "
@@ -440,7 +441,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     Map<String, Object> properties = new HashMap<>();
 
     if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
-      Utils.notNull(
+      AssertUtils.notNull(
           cluster,
           String.format(
               "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or the cluster has been deleted. Please contact the Admin.",
@@ -461,7 +462,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
         return application.getJobName();
       case KUBERNETES_NATIVE_SESSION:
       case YARN_SESSION:
-        Utils.notNull(
+        AssertUtils.notNull(
             cluster,
             String.format(
                 "The %s clusterId=%s cannot be find, maybe the clusterId is 
wrong or the cluster has been deleted. Please contact the Admin.",
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index f01b5df58..e9997af41 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.core.service.SqlCompleteService;
 
 import com.google.common.collect.Sets;
@@ -185,7 +185,7 @@ public class SqlCompleteServiceImpl implements 
SqlCompleteService {
         nowStep = nowStep.get(nowChar).getNext();
         loc += 1;
       }
-      Utils.notNull(preNode);
+      AssertUtils.notNull(preNode);
       preNode.setStop();
       preNode.setCount(count);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index 22d4e89ad..d8ea8714e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -74,8 +74,9 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
 
   @Override
   public IPage<YarnQueue> page(YarnQueue yarnQueue, RestRequest request) {
-    Utils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
-    Utils.notNull(yarnQueue.getTeamId(), "Team id of yarn queue query params 
mustn't be null.");
+    AssertUtils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
+    AssertUtils.notNull(
+        yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be 
null.");
     Page<YarnQueue> page = MybatisPager.getPage(request);
     return this.baseMapper.findQueues(page, yarnQueue);
   }
@@ -88,8 +89,8 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
   @Override
   public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) {
 
-    Utils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
-    Utils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
+    AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
+    AssertUtils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
 
     ResponseResult<String> responseResult = new ResponseResult<>();
 
@@ -211,8 +212,8 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
 
   @VisibleForTesting
   public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) {
-    Utils.notNull(yarnQueue, "Yarn queue mustn't be null.");
-    Utils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
+    AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be null.");
+    AssertUtils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
     YarnQueue queueFromDB = getById(yarnQueue.getId());
     ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist.");
     return queueFromDB;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index 3e95f7e5e..ae7d19532 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -173,8 +173,10 @@ public class MemberServiceImpl extends 
ServiceImpl<MemberMapper, Member> impleme
                 () ->
                     new ApiAlertException(
                         String.format("The member [id=%s] not found", 
member.getId())));
-    Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id 
cannot be changed.");
-    Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id 
cannot be changed.");
+    AssertUtils.required(
+        oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be 
changed.");
+    AssertUtils.required(
+        oldMember.getUserId().equals(member.getUserId()), "User id cannot be 
changed.");
     Optional.ofNullable(roleService.getById(member.getRoleId()))
         .orElseThrow(
             () ->
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 584f157a1..69d6c74f3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -79,7 +79,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, 
User> implements Us
   public IPage<User> page(User user, RestRequest request) {
     Page<User> page = MybatisPager.getPage(request);
     IPage<User> resPage = this.baseMapper.findUserDetail(page, user);
-    Utils.notNull(resPage);
+    AssertUtils.notNull(resPage);
     if (resPage.getTotal() == 0) {
       resPage.setRecords(Collections.emptyList());
     }
@@ -196,7 +196,7 @@ public class UserServiceImpl extends 
ServiceImpl<UserMapper, User> implements Us
   @Override
   public void setLastTeam(Long teamId, Long userId) {
     User user = getById(userId);
-    Utils.notNull(user);
+    AssertUtils.notNull(user);
     user.setLastTeamId(teamId);
     this.baseMapper.updateById(user);
   }
@@ -204,7 +204,7 @@ public class UserServiceImpl extends 
ServiceImpl<UserMapper, User> implements Us
   @Override
   public void clearLastTeam(Long userId, Long teamId) {
     User user = getById(userId);
-    Utils.notNull(user);
+    AssertUtils.notNull(user);
     if (!teamId.equals(user.getLastTeamId())) {
       return;
     }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index 4c6144a36..b146d281b 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.hbase.source;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.flink.connector.function.RunningFunction;
 import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
 import 
org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
@@ -42,8 +42,8 @@ public class HBaseJavaSource<T> {
       HBaseResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    Utils.notNull(queryFunction, "queryFunction must not be null");
-    Utils.notNull(resultFunction, "resultFunction must not be null");
+    AssertUtils.notNull(queryFunction, "queryFunction must not be null");
+    AssertUtils.notNull(resultFunction, "resultFunction must not be null");
     HBaseSourceFunction<T> sourceFunction =
         new HBaseSourceFunction<>(property, queryFunction, resultFunction, 
runningFunc, null);
     return context.getJavaEnv().addSource(sourceFunction);
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
index 8f9634978..636ddf5d3 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.flink.connector.jdbc.sink;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ConfigUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.flink.connector.function.TransformFunction;
 import org.apache.streampark.flink.connector.jdbc.internal.JdbcSinkFunction;
 import org.apache.streampark.flink.core.scala.StreamingContext;
@@ -55,7 +55,7 @@ public class JdbcJavaSink<T> {
   }
 
   public DataStreamSink<T> sink(DataStream<T> dataStream) {
-    Utils.notNull(sqlFunc, "transformFunction can not be null");
+    AssertUtils.notNull(sqlFunc, "transformFunction can not be null");
     if (this.jdbc == null) {
       this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), 
alias);
     }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index e3f36d34c..083f6d44d 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.flink.connector.jdbc.source;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ConfigUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.flink.connector.function.RunningFunction;
 import org.apache.streampark.flink.connector.function.SQLQueryFunction;
 import org.apache.streampark.flink.connector.function.SQLResultFunction;
@@ -54,8 +54,8 @@ public class JdbcJavaSource<T> {
       SQLResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    Utils.notNull(queryFunction, "queryFunction must not be null");
-    Utils.notNull(resultFunction, "resultFunction must not be null");
+    AssertUtils.notNull(queryFunction, "queryFunction must not be null");
+    AssertUtils.notNull(resultFunction, "resultFunction must not be null");
     if (this.jdbc == null) {
       this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), 
alias);
     }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
index 0e2cab882..6186711ae 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.jdbc.source
 
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ConfigUtils
 import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction
 import org.apache.streampark.flink.core.scala.StreamingContext
 
@@ -31,14 +31,15 @@ import scala.collection.Map
 
 object JdbcSource {
 
-  def apply(@(transient @param) property: Properties = new 
Properties())(implicit
-      ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, property)
+  def apply(alias: String = "", properties: Properties = new 
Properties())(implicit
+      ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, alias, 
properties) {}
 
 }
 
 class JdbcSource(
     @(transient @param) val ctx: StreamingContext,
-    property: Properties = new Properties()) {
+    alias: String,
+    property: Properties) {
 
   /**
    * @param sqlFun
@@ -50,8 +51,11 @@ class JdbcSource(
   def getDataStream[R: TypeInformation](
       sqlFun: R => String,
       fun: Iterable[Map[String, _]] => Iterable[R],
-      running: Unit => Boolean)(implicit jdbc: Properties = new Properties()): 
DataStream[R] = {
-    Utils.copyProperties(property, jdbc)
+      running: Unit => Boolean): DataStream[R] = {
+    val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
+    if (property != null) {
+      jdbc.putAll(property)
+    }
     val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, fun, running)
     ctx.addSource(mysqlFun)
   }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index dc49f621d..560de9eb0 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.mongo.source;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.flink.connector.function.RunningFunction;
 import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction;
 import 
org.apache.streampark.flink.connector.mongo.function.MongoResultFunction;
@@ -43,9 +43,9 @@ public class MongoJavaSource<T> {
       MongoResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    Utils.notNull(collectionName, "collectionName must not be null");
-    Utils.notNull(queryFunction, "queryFunction must not be null");
-    Utils.notNull(resultFunction, "resultFunction must not be null");
+    AssertUtils.notNull(collectionName, "collectionName must not be null");
+    AssertUtils.notNull(queryFunction, "queryFunction must not be null");
+    AssertUtils.notNull(resultFunction, "resultFunction must not be null");
     MongoSourceFunction<T> sourceFunction =
         new MongoSourceFunction<>(
             collectionName, property, queryFunction, resultFunction, 
runningFunc, null);
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
index 795db7d09..61663845a 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.packer.docker
 
 import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.AssertUtils
 
 import com.github.dockerjava.api.DockerClient
 import com.github.dockerjava.core.{DefaultDockerClientConfig, 
DockerClientConfig, HackDockerClient}
@@ -61,7 +61,7 @@ object DockerRetriever {
   /** set docker-host for kata */
   def setDockerHost(): Unit = {
     val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST)
-    if (Utils.notEmpty(dockerhost)) {
+    if (AssertUtils.isNotEmpty(dockerhost)) {
       val dockerHostUri: URI = new URI(dockerhost)
       dockerHttpClientBuilder.dockerHost(dockerHostUri)
     }

Reply via email to